Whamcloud - gitweb
LU-1832 ldlm: fix double list add
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53 #include <lustre_log.h>
54
55 #include "mds_internal.h"
56
57 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 {
59         struct mds_obd *mds = &obd->u.mds;
60         unsigned int i=0, j;
61
62         if ((libcfs_debug & D_INFO) == 0)
63                 return;
64
65         CDEBUG(D_INFO, "dump from %s\n", label);
66         if (mds->mds_lov_page_dirty == NULL) {
67                 CERROR("NULL bitmap!\n");
68                 GOTO(skip_bitmap, i);
69         }
70
71         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
72                 CDEBUG(D_INFO, "%u - %lx\n", i,
73                        mds->mds_lov_page_dirty->data[i]);
74 skip_bitmap:
75         if (mds->mds_lov_page_array == NULL) {
76                 CERROR("not init page array!\n");
77                 GOTO(skip_array, i);
78
79         }
80         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
81                 obd_id *data = mds->mds_lov_page_array[i];
82
83                 if (data == NULL)
84                         continue;
85
86                 for(j=0; j < OBJID_PER_PAGE(); j++) {
87                         if (data[j] == 0)
88                                 continue;
89                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
90                                i, j, data[j]);
91                 }
92         }
93 skip_array:
94         EXIT;
95 }
96
97 int mds_lov_init_objids(struct obd_device *obd)
98 {
99         struct mds_obd *mds = &obd->u.mds;
100         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
101         struct file *file;
102         int rc;
103         ENTRY;
104
105         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
106
107         mds->mds_lov_page_dirty =
108                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
109         if (mds->mds_lov_page_dirty == NULL)
110                 RETURN(-ENOMEM);
111
112
113         OBD_ALLOC(mds->mds_lov_page_array, size);
114         if (mds->mds_lov_page_array == NULL)
115                 GOTO(err_free_bitmap, rc = -ENOMEM);
116
117         /* open and test the lov objd file */
118         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
119         if (IS_ERR(file)) {
120                 rc = PTR_ERR(file);
121                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
122                 GOTO(err_free, rc = PTR_ERR(file));
123         }
124         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
125                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
126                        file->f_dentry->d_inode->i_mode);
127                 GOTO(err_open, rc = -ENOENT);
128         }
129         mds->mds_lov_objid_filp = file;
130
131         RETURN (0);
132 err_open:
133         if (filp_close((struct file *)file, 0))
134                 CERROR("can't close %s after error\n", LOV_OBJID);
135 err_free:
136         OBD_FREE(mds->mds_lov_page_array, size);
137 err_free_bitmap:
138         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
139
140         RETURN(rc);
141 }
142
143 void mds_lov_destroy_objids(struct obd_device *obd)
144 {
145         struct mds_obd *mds = &obd->u.mds;
146         int i, rc;
147         ENTRY;
148
149         if (mds->mds_lov_page_array != NULL) {
150                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
151                         obd_id *data = mds->mds_lov_page_array[i];
152                         if (data != NULL)
153                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
154                 }
155                 OBD_FREE(mds->mds_lov_page_array,
156                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
157         }
158
159         if (mds->mds_lov_objid_filp) {
160                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
161                 mds->mds_lov_objid_filp = NULL;
162                 if (rc)
163                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
164         }
165
166         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
167         EXIT;
168 }
169
170 /**
171  * currently exist two ways for know about ost count and max ost index.
172  * first - after ost is connected to mds and sync process finished
173  * second - get from lmm in recovery process, in case when mds not have configs,
174  * and ost isn't registered in mgs.
175  *
176  * \param mds pointer to mds structure
177  * \param index maxium ost index
178  *
179  * \retval -ENOMEM is not hame memory for new page
180  * \retval 0 is update passed
181  */
182 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
183 {
184         __u32 page = index / OBJID_PER_PAGE();
185         __u32 off = index % OBJID_PER_PAGE();
186         obd_id *data =  mds->mds_lov_page_array[page];
187
188         if (data == NULL) {
189                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
190                 if (data == NULL)
191                         RETURN(-ENOMEM);
192
193                 mds->mds_lov_page_array[page] = data;
194         }
195
196         if (index > mds->mds_lov_objid_max_index) {
197                 mds->mds_lov_objid_lastpage = page;
198                 mds->mds_lov_objid_lastidx = off;
199                 mds->mds_lov_objid_max_index = index;
200         }
201
202         /* workaround - New target not in objids file; increase mdsize */
203         /* ld_tgt_count is used as the max index everywhere, despite its name. */
204         if (data[off] == 0) {
205                 __u32 max_easize;
206                 __u32 stripes;
207
208                 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
209                 data[off] = 1;
210                 mds->mds_lov_objid_count++;
211                 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
212                               mds->mds_lov_objid_count);
213
214                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
215                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
216
217                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
218                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
219                        mds->mds_max_cookiesize);
220         }
221
222         EXIT;
223         return 0;
224 }
225
226 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
227 {
228         __u32 page = index / OBJID_PER_PAGE();
229         __u32 off = index % OBJID_PER_PAGE();
230         obd_id *data =  mds->mds_lov_page_array[page];
231
232         return (data[off] > 0);
233 }
234
235 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
236 {
237         struct lov_ost_data_v1 *data;
238         __u16 count;
239         int rc = 0;
240         __u32 j;
241
242         /* if we create file without objects - lmm is NULL */
243         if (lmm == NULL)
244                 return 0;
245
246         switch (le32_to_cpu(lmm->lmm_magic)) {
247                 case LOV_MAGIC_V1:
248                         count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
249                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
250                         break;
251                 case LOV_MAGIC_V3:
252                         count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
253                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
254                         break;
255                 default:
256                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
257                         RETURN(-EINVAL);
258         }
259
260
261         cfs_mutex_lock(&obd->obd_dev_mutex);
262         for (j = 0; j < count; j++) {
263                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
264                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
265                         rc = -ENOMEM;
266                         break;
267                 }
268         }
269         cfs_mutex_unlock(&obd->obd_dev_mutex);
270
271         RETURN(rc);
272 }
273 EXPORT_SYMBOL(mds_lov_prepare_objids);
274
275 /*
276  * write llog orphan record about lost ost object,
277  * Special lsm is allocated with single stripe, caller should deallocated it
278  * after use
279  */
280 static int mds_log_lost_precreated(struct obd_device *obd,
281                                    struct lov_stripe_md **lsmp, __u16 *stripes,
282                                    obd_id id, obd_count count, int idx)
283 {
284         struct lov_stripe_md *lsm = *lsmp;
285         int rc;
286         ENTRY;
287
288         if (*lsmp == NULL) {
289                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
290                 if (rc < 0)
291                         RETURN(rc);
292                 /* need only one stripe, save old value */
293                 *stripes = lsm->lsm_stripe_count;
294                 lsm->lsm_stripe_count = 1;
295                 *lsmp = lsm;
296         }
297
298         lsm->lsm_oinfo[0]->loi_id = id;
299         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
300         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
301
302         rc = mds_log_op_orphan(obd, lsm, count);
303         RETURN(rc);
304 }
305
306 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
307 {
308         struct mds_obd *mds = &obd->u.mds;
309         int j;
310         struct lov_ost_data_v1 *obj;
311         struct lov_stripe_md *lsm = NULL;
312         __u16 stripes = 0;
313         int count;
314         ENTRY;
315
316         /* if we create file without objects - lmm is NULL */
317         if (lmm == NULL)
318                 return;
319
320         switch (le32_to_cpu(lmm->lmm_magic)) {
321                 case LOV_MAGIC_V1:
322                         count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
323                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
324                         break;
325                 case LOV_MAGIC_V3:
326                         count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
327                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
328                         break;
329                 default:
330                         CERROR("Unknow lmm type %X !\n",
331                                le32_to_cpu(lmm->lmm_magic));
332                         return;
333         }
334
335         for (j = 0; j < count; j++) {
336                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
337                 obd_id id = le64_to_cpu(obj[j].l_object_id);
338                 __u32 page = i / OBJID_PER_PAGE();
339                 __u32 idx = i % OBJID_PER_PAGE();
340                 obd_id *data;
341
342                 data = mds->mds_lov_page_array[page];
343
344                 CDEBUG(D_INODE,"update last object for ost %u"
345                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
346                 if (id > data[idx]) {
347                         int lost = id - data[idx] - 1;
348                         /* we might have lost precreated objects due to VBR */
349                         if (lost > 0 && obd->obd_recovering) {
350                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
351                                 if (!obd->obd_version_recov)
352                                         CERROR("Unexpected gap in objids\n");
353                                 /* lsm is allocated if NULL */
354                                 mds_log_lost_precreated(obd, &lsm, &stripes,
355                                                         data[idx]+1, lost, i);
356                         }
357                         data[idx] = id;
358                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
359                 }
360         }
361         if (lsm) {
362                 /* restore stripes number */
363                 lsm->lsm_stripe_count = stripes;
364                 obd_free_memmd(mds->mds_lov_exp, &lsm);
365         }
366         EXIT;
367         return;
368 }
369 EXPORT_SYMBOL(mds_lov_update_objids);
370
371 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
372                                     __u32 count)
373 {
374         __u32 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
375         __u32 i, stripes;
376
377         for (i = 0; i < count; i++) {
378                 if (data[i] == 0)
379                         continue;
380
381                 mds->mds_lov_objid_count++;
382         }
383
384         stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
385                          mds->mds_lov_objid_count);
386
387         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
388         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
389
390         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
391                "%d/%d\n", stripes, mds->mds_max_mdsize,
392                mds->mds_max_cookiesize);
393
394         EXIT;
395         return 0;
396 }
397
398 static int mds_lov_read_objids(struct obd_device *obd)
399 {
400         struct mds_obd *mds = &obd->u.mds;
401         loff_t off = 0;
402         int i, rc = 0, count = 0, page = 0;
403         unsigned long size;
404         ENTRY;
405
406         /* Read everything in the file, even if our current lov desc
407            has fewer targets. Old targets not in the lov descriptor
408            during mds setup may still have valid objids. */
409         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
410         if (size == 0)
411                 RETURN(0);
412
413         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
414         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
415         for (i = 0; i < page; i++) {
416                 obd_id *data;
417                 loff_t off_old = off;
418
419                 LASSERT(mds->mds_lov_page_array[i] == NULL);
420                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
421                 if (mds->mds_lov_page_array[i] == NULL)
422                         GOTO(out, rc = -ENOMEM);
423
424                 data = mds->mds_lov_page_array[i];
425
426                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
427                                         MDS_LOV_ALLOC_SIZE, &off);
428                 if (rc < 0) {
429                         CERROR("Error reading objids %d\n", rc);
430                         GOTO(out, rc);
431                 }
432                 if (off == off_old) /* hole is read */
433                         off += MDS_LOV_ALLOC_SIZE;
434
435                 count = (off - off_old) / sizeof(obd_id);
436                 if (mds_lov_update_from_read(mds, data, count)) {
437                         CERROR("Can't update mds data\n");
438                         GOTO(out, rc = -EIO);
439                 }
440         }
441         mds->mds_lov_objid_lastpage = page - 1;
442         mds->mds_lov_objid_lastidx = count - 1;
443
444         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
445                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
446 out:
447         mds_lov_dump_objids("read",obd);
448
449         RETURN(rc);
450 }
451
452 int mds_lov_write_objids(struct obd_device *obd)
453 {
454         struct mds_obd *mds = &obd->u.mds;
455         int i = 0, rc = 0;
456         ENTRY;
457
458         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
459                 RETURN(0);
460
461         mds_lov_dump_objids("write", obd);
462
463         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
464                 obd_id *data =  mds->mds_lov_page_array[i];
465                 unsigned int size = MDS_LOV_ALLOC_SIZE;
466                 loff_t off = i * size;
467
468                 LASSERT(data != NULL);
469
470                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
471                         continue;
472
473                 /* check for particaly filled last page */
474                 if (i == mds->mds_lov_objid_lastpage)
475                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
476
477                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
478                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
479                                          size, &off, 0);
480                 if (rc < 0) {
481                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
482                         break;
483                 }
484         }
485         if (rc >= 0)
486                 rc = 0;
487
488         RETURN(rc);
489 }
490 EXPORT_SYMBOL(mds_lov_write_objids);
491
492 static int mds_lov_get_objid(struct obd_device * obd,
493                              obd_id idx)
494 {
495         struct mds_obd *mds = &obd->u.mds;
496         struct obd_export *lov_exp = mds->mds_lov_exp;
497         unsigned int page;
498         unsigned int off;
499         obd_id *data;
500         __u32 size;
501         int rc = 0;
502         ENTRY;
503
504         page = idx / OBJID_PER_PAGE();
505         off = idx % OBJID_PER_PAGE();
506         data = mds->mds_lov_page_array[page];
507
508         if (data[off] < 2) {
509                 /* We never read this lastid; ask the osc */
510                 struct obd_id_info lastid;
511
512                 size = sizeof(lastid);
513                 lastid.idx = idx;
514                 lastid.data = &data[off];
515                 rc = obd_get_info(NULL, lov_exp, sizeof(KEY_LAST_ID),
516                                   KEY_LAST_ID, &size, &lastid, NULL);
517                 if (rc)
518                         GOTO(out, rc);
519
520                 /* workaround for clean filter */
521                 if (data[off] == 0)
522                         data[off] = 1;
523
524                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
525         }
526         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
527                idx, data, page, off, data[off]);
528 out:
529         RETURN(rc);
530 }
531
532 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
533 {
534         int rc;
535         struct obdo oa = { 0 };
536         struct obd_trans_info oti = {0};
537         struct lov_stripe_md  *empty_ea = NULL;
538         ENTRY;
539
540         LASSERT(mds->mds_lov_page_array != NULL);
541
542         /* This create will in fact either create or destroy:  If the OST is
543          * missing objects below this ID, they will be created.  If it finds
544          * objects above this ID, they will be removed. */
545         memset(&oa, 0, sizeof(oa));
546         oa.o_flags = OBD_FL_DELORPHAN;
547         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
548         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
549         if (ost_uuid != NULL)
550                 oti.oti_ost_uuid = ost_uuid;
551
552         rc = obd_create(NULL, mds->mds_lov_exp, &oa, &empty_ea, &oti);
553
554         RETURN(rc);
555 }
556
557 /* for one target */
558 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
559 {
560         struct mds_obd *mds = &obd->u.mds;
561         int rc;
562         struct obd_id_info info;
563         ENTRY;
564
565         LASSERT(!obd->obd_recovering);
566
567         info.idx = idx;
568         info.data = id;
569         rc = obd_set_info_async(NULL, mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
570                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
571         if (rc)
572                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
573                         obd->obd_name, rc);
574
575         RETURN(rc);
576 }
577
578 /* Update the lov desc for a new size lov. */
579 static int mds_lov_update_desc(struct obd_device *obd, int idx,
580                                struct obd_uuid *uuid)
581 {
582         struct mds_obd *mds = &obd->u.mds;
583         struct lov_desc *ld;
584         __u32 valsize = sizeof(mds->mds_lov_desc);
585         int rc = 0;
586         ENTRY;
587
588         OBD_ALLOC(ld, sizeof(*ld));
589         if (!ld)
590                 RETURN(-ENOMEM);
591
592         rc = obd_get_info(NULL, mds->mds_lov_exp, sizeof(KEY_LOVDESC),
593                           KEY_LOVDESC, &valsize, ld, NULL);
594         if (rc)
595                 GOTO(out, rc);
596
597         /* Don't change the mds_lov_desc until the objids size matches the
598            count (paranoia) */
599         mds->mds_lov_desc = *ld;
600         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
601                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
602
603         cfs_mutex_lock(&obd->obd_dev_mutex);
604         rc = mds_lov_update_max_ost(mds, idx);
605         cfs_mutex_unlock(&obd->obd_dev_mutex);
606         if (rc != 0)
607                 GOTO(out, rc );
608
609         /* If we added a target we have to reconnect the llogs */
610         /* We only _need_ to do this at first add (idx), or the first time
611            after recovery.  However, it should now be safe to call anytime. */
612         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
613         if (rc)
614                 GOTO(out, rc);
615
616 out:
617         OBD_FREE(ld, sizeof(*ld));
618         RETURN(rc);
619 }
620
621 /* Inform MDS about new/updated target */
622 static int mds_lov_update_mds(struct obd_device *obd,
623                               struct obd_device *watched,
624                               __u32 idx)
625 {
626         struct mds_obd *mds = &obd->u.mds;
627         int rc = 0;
628         int page;
629         int off;
630         obd_id *data;
631         ENTRY;
632
633         LASSERT(mds_lov_objinit(mds, idx));
634
635         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
636                idx, obd->obd_recovering, obd->obd_async_recov,
637                mds->mds_lov_desc.ld_tgt_count);
638
639         /* idx is set as data from lov_notify. */
640         if (obd->obd_recovering)
641                 GOTO(out, rc);
642
643         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
644                 CERROR("index %d > count %d!\n", idx,
645                        mds->mds_lov_desc.ld_tgt_count);
646                 GOTO(out, rc = -EINVAL);
647         }
648
649         rc = mds_lov_get_objid(obd, idx);
650         if (rc)
651                 GOTO(out, rc);
652
653         page = idx / OBJID_PER_PAGE();
654         off = idx % OBJID_PER_PAGE();
655         data = mds->mds_lov_page_array[page];
656
657         /* We have read this lastid from disk; tell the osc.
658            Don't call this during recovery. */
659         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
660         if (rc) {
661                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
662                 /* Don't abort the rest of the sync */
663                 rc = 0;
664         } else {
665                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
666                         data[off], idx, rc);
667         }
668 out:
669         RETURN(rc);
670 }
671
672 /* update the LOV-OSC knowledge of the last used object id's */
673 int mds_lov_connect(struct obd_device *obd, char * lov_name)
674 {
675         struct mds_obd *mds = &obd->u.mds;
676         struct obd_connect_data *data;
677         int rc;
678         ENTRY;
679
680         if (IS_ERR(mds->mds_lov_obd))
681                 RETURN(PTR_ERR(mds->mds_lov_obd));
682
683         if (mds->mds_lov_obd)
684                 RETURN(0);
685
686         mds->mds_lov_obd = class_name2obd(lov_name);
687         if (!mds->mds_lov_obd) {
688                 CERROR("MDS cannot locate LOV %s\n", lov_name);
689                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
690                 RETURN(-ENOTCONN);
691         }
692
693         cfs_mutex_lock(&obd->obd_dev_mutex);
694         rc = mds_lov_read_objids(obd);
695         cfs_mutex_unlock(&obd->obd_dev_mutex);
696         if (rc) {
697                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
698                 GOTO(err_exit, rc);
699         }
700
701         rc = obd_register_observer(mds->mds_lov_obd, obd);
702         if (rc) {
703                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
704                        lov_name, rc);
705                 GOTO(err_exit, rc);
706         }
707
708         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
709          * targets */
710         obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
711
712         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
713
714         OBD_ALLOC(data, sizeof(*data));
715         if (data == NULL)
716                 GOTO(err_exit, rc = -ENOMEM);
717
718         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
719                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
720                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FULL20  |
721                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
722                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
723                                   OBD_CONNECT_SOM | OBD_CONNECT_MAX_EASIZE;
724 #ifdef HAVE_LRU_RESIZE_SUPPORT
725         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
726 #endif
727         data->ocd_version = LUSTRE_VERSION_CODE;
728         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
729         data->ocd_max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
730
731         /* send max bytes per rpc */
732         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
733         /* send the list of supported checksum types */
734         data->ocd_cksum_types = cksum_types_supported_client();
735         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
736         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
737         OBD_FREE(data, sizeof(*data));
738         if (rc) {
739                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
740                 mds->mds_lov_obd = ERR_PTR(rc);
741                 RETURN(rc);
742         }
743
744         /* I want to see a callback happen when the OBD moves to a
745          * "For General Use" state, and that's when we'll call
746          * set_nextid().  The class driver can help us here, because
747          * it can use the obd_recovering flag to determine when the
748          * the OBD is full available. */
749         /* MDD device will care about that
750         if (!obd->obd_recovering)
751                 rc = mds_postrecov(obd);
752          */
753         RETURN(rc);
754
755 err_exit:
756         mds->mds_lov_exp = NULL;
757         mds->mds_lov_obd = ERR_PTR(rc);
758         RETURN(rc);
759 }
760
761 int mds_lov_disconnect(struct obd_device *obd)
762 {
763         struct mds_obd *mds = &obd->u.mds;
764         int rc = 0;
765         ENTRY;
766
767         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
768                 obd_register_observer(mds->mds_lov_obd, NULL);
769
770                 /* The actual disconnect of the mds_lov will be called from
771                  * class_disconnect_exports from mds_lov_clean. So we have to
772                  * ensure that class_cleanup doesn't fail due to the extra ref
773                  * we're holding now. The mechanism to do that already exists -
774                  * the obd_force flag. We'll drop the final ref to the
775                  * mds_lov_exp in mds_cleanup. */
776                 mds->mds_lov_obd->obd_force = 1;
777         }
778
779         RETURN(rc);
780 }
781
782 struct mds_lov_sync_info {
783         struct obd_device    *mlsi_obd;     /* the lov device to sync */
784         struct obd_device    *mlsi_watched; /* target osc */
785         __u32                 mlsi_index;   /* index of target */
786 };
787
788 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
789 {
790         struct mds_capa_info    info = { .uuid = uuid };
791         struct lustre_capa_key *key;
792         int i, rc = 0;
793
794         ENTRY;
795
796         if (!mds->mds_capa_keys)
797                 RETURN(0);
798
799         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
800         for (i = 0; i < 2; i++) {
801                 key = &mds->mds_capa_keys[i];
802                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
803
804                 info.capa = key;
805                 rc = obd_set_info_async(NULL, mds->mds_lov_exp,
806                                         sizeof(KEY_CAPA_KEY), KEY_CAPA_KEY,
807                                         sizeof(info), &info, NULL);
808                 if (rc) {
809                         DEBUG_CAPA_KEY(D_ERROR, key,
810                                        "propagate failed (rc = %d) for", rc);
811                         RETURN(rc);
812                 }
813         }
814
815         RETURN(0);
816 }
817
818 /* We only sync one osc at a time, so that we don't have to hold
819    any kind of lock on the whole mds_lov_desc, which may change
820    (grow) as a result of mds_lov_add_ost.  This also avoids any
821    kind of mismatch between the lov_desc and the mds_lov_desc,
822    which are not in lock-step during lov_add_obd */
823 static int __mds_lov_synchronize(void *data)
824 {
825         struct mds_lov_sync_info *mlsi = data;
826         struct obd_device *obd = mlsi->mlsi_obd;
827         struct obd_device *watched = mlsi->mlsi_watched;
828         struct mds_obd *mds = &obd->u.mds;
829         struct obd_uuid *uuid;
830         __u32  idx = mlsi->mlsi_index;
831         struct mds_group_info mgi;
832         struct llog_ctxt *ctxt;
833         int rc = 0;
834         ENTRY;
835
836         OBD_FREE_PTR(mlsi);
837
838         LASSERT(obd);
839         LASSERT(watched);
840         uuid = &watched->u.cli.cl_target_uuid;
841         LASSERT(uuid);
842
843         cfs_down_read(&mds->mds_notify_lock);
844         if (obd->obd_stopping || obd->obd_fail)
845                 GOTO(out, rc = -ENODEV);
846
847         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
848         rc = mds_lov_update_mds(obd, watched, idx);
849         if (rc != 0) {
850                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
851                 GOTO(out, rc);
852         }
853         mgi.group = mdt_to_obd_objseq(mds->mds_id);
854         mgi.uuid = uuid;
855
856         rc = obd_set_info_async(NULL, mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
857                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
858         if (rc != 0)
859                 GOTO(out, rc);
860         /* propagate capability keys */
861         rc = mds_propagate_capa_keys(mds, uuid);
862         if (rc)
863                 GOTO(out, rc);
864
865         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
866         if (!ctxt)
867                 GOTO(out, rc = -ENODEV);
868
869         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
870         rc = llog_connect(ctxt, NULL, NULL, uuid);
871         llog_ctxt_put(ctxt);
872         if (rc != 0) {
873                 CERROR("%s failed at llog_origin_connect: %d\n",
874                        obd_uuid2str(uuid), rc);
875                 GOTO(out, rc);
876         }
877
878         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
879               obd->obd_name, obd_uuid2str(uuid));
880
881         rc = mds_lov_clear_orphans(mds, uuid);
882         if (rc != 0) {
883                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
884                        obd_uuid2str(uuid), rc);
885                 GOTO(out, rc);
886         }
887
888         EXIT;
889 out:
890         if (rc) {
891                 /* Deactivate it for safety */
892                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
893                        rc);
894                 if (!obd->obd_stopping && mds->mds_lov_obd &&
895                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
896                         obd_notify(mds->mds_lov_obd, watched,
897                                    OBD_NOTIFY_INACTIVE, NULL);
898         }
899         cfs_up_read(&mds->mds_notify_lock);
900
901         class_decref(obd, "mds_lov_synchronize", obd);
902         return rc;
903 }
904
905 int mds_lov_synchronize(void *data)
906 {
907         struct mds_lov_sync_info *mlsi = data;
908         char name[20];
909
910         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
911         cfs_daemonize_ctxt(name);
912
913         RETURN(__mds_lov_synchronize(data));
914 }
915
916 int mds_lov_start_synchronize(struct obd_device *obd,
917                               struct obd_device *watched,
918                               void *data, enum obd_notify_event ev)
919 {
920         struct mds_lov_sync_info *mlsi;
921         int rc;
922         struct obd_uuid *uuid;
923         ENTRY;
924
925         LASSERT(watched);
926         uuid = &watched->u.cli.cl_target_uuid;
927
928         OBD_ALLOC(mlsi, sizeof(*mlsi));
929         if (mlsi == NULL)
930                 RETURN(-ENOMEM);
931
932         LASSERT(data);
933         mlsi->mlsi_obd = obd;
934         mlsi->mlsi_watched = watched;
935         mlsi->mlsi_index = *(__u32 *)data;
936
937         /* Although class_export_get(obd->obd_self_export) would lock
938            the MDS in place, since it's only a self-export
939            it doesn't lock the LOV in place.  The LOV can be disconnected
940            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
941            Simply taking an export ref on the LOV doesn't help, because it's
942            still disconnected. Taking an obd reference insures that we don't
943            disconnect the LOV.  This of course means a cleanup won't
944            finish for as long as the sync is blocking. */
945         class_incref(obd, "mds_lov_synchronize", obd);
946
947         if (ev != OBD_NOTIFY_SYNC) {
948                 /* Synchronize in the background */
949                 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
950                                        CFS_DAEMON_FLAGS);
951                 if (rc < 0) {
952                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
953                                obd->obd_name, rc);
954                         class_decref(obd, "mds_lov_synchronize", obd);
955                 } else {
956                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
957                                "thread=%d\n", obd->obd_name,
958                                mlsi->mlsi_index, rc);
959                         rc = 0;
960                 }
961         } else {
962                 rc = __mds_lov_synchronize((void *)mlsi);
963         }
964
965         RETURN(rc);
966 }
967
968 int mds_notify(struct obd_device *obd, struct obd_device *watched,
969                enum obd_notify_event ev, void *data)
970 {
971         struct mds_obd *mds = &obd->u.mds;
972         int rc = 0;
973         ENTRY;
974
975         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
976
977         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
978                 CERROR("unexpected notification of %s %s!\n",
979                        watched->obd_type->typ_name, watched->obd_name);
980                 RETURN(-EINVAL);
981         }
982
983         /*XXX this notifies the MDD until lov handling use old mds code
984          * must non block!
985          */
986         if (obd->obd_upcall.onu_owner) {
987                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
988                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
989                                                  obd->obd_upcall.onu_owner,
990                                                  &mds->mds_obt.obt_mount_count);
991         }
992
993         switch (ev) {
994         /* We only handle these: */
995         case OBD_NOTIFY_CREATE:
996                 CDEBUG(D_CONFIG, "%s: add target %s\n", obd->obd_name,
997                        obd_uuid2str(&watched->u.cli.cl_target_uuid));
998                 /* We still have to fix the lov descriptor for ost's */
999                 LASSERT(data);
1000                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1001                                           &watched->u.cli.cl_target_uuid);
1002                 RETURN(rc);
1003         case OBD_NOTIFY_ACTIVE:
1004                 /* lov want one or more _active_ targets for work */
1005                 /* activate event should be pass lov idx as argument */
1006         case OBD_NOTIFY_SYNC:
1007         case OBD_NOTIFY_SYNC_NONBLOCK:
1008                 /* sync event should be pass lov idx as argument */
1009                 break;
1010         default:
1011                 RETURN(0);
1012         }
1013
1014         if (obd->obd_recovering) {
1015                 CDEBUG(D_CONFIG, "%s: Is in recovery, "
1016                        "not resetting orphans on %s\n",
1017                        obd->obd_name,
1018                        obd_uuid2str(&watched->u.cli.cl_target_uuid));
1019                 /* We still have to fix the lov descriptor for ost's added
1020                    after the mdt in the config log.  They didn't make it into
1021                    mds_lov_connect. */
1022                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1023                                          &watched->u.cli.cl_target_uuid);
1024         } else {
1025                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1026         }
1027         RETURN(rc);
1028 }