Whamcloud - gitweb
Branch: b1_4
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26
27 #ifdef __KERNEL__
28 #include <asm/div64.h>
29 #else
30 #include <liblustre.h>
31 #endif
32
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
36
37 #include "lov_internal.h"
38
39 static void lov_init_set(struct lov_request_set *set)
40 {
41         set->set_count = 0;
42         set->set_completes = 0;
43         set->set_success = 0;
44         INIT_LIST_HEAD(&set->set_list);
45         atomic_set(&set->set_refcount, 1);
46 }
47
48 static void lov_finish_set(struct lov_request_set *set)
49 {
50         struct list_head *pos, *n;
51         ENTRY;
52
53         LASSERT(set);
54         list_for_each_safe(pos, n, &set->set_list) {
55                 struct lov_request *req = list_entry(pos, struct lov_request,
56                                                      rq_link);
57                 list_del_init(&req->rq_link);
58
59                 if (req->rq_oa)
60                         obdo_free(req->rq_oa);
61                 if (req->rq_md)
62                         OBD_FREE(req->rq_md, req->rq_buflen);
63                 OBD_FREE(req, sizeof(*req));
64         }
65
66         if (set->set_pga) {
67                 int len = set->set_oabufs * sizeof(*set->set_pga);
68                 OBD_FREE(set->set_pga, len);
69         }
70         if (set->set_lockh)
71                 lov_llh_put(set->set_lockh);
72
73         OBD_FREE(set, sizeof(*set));
74         EXIT;
75 }
76
77 static void lov_update_set(struct lov_request_set *set,
78                            struct lov_request *req, int rc)
79 {
80         req->rq_complete = 1;
81         req->rq_rc = rc;
82
83         set->set_completes++;
84         if (rc == 0)
85                 set->set_success++;
86 }
87
88 int lov_update_common_set(struct lov_request_set *set,
89                           struct lov_request *req, int rc)
90 {
91         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
92         ENTRY;
93
94         lov_update_set(set, req, rc);
95
96         /* grace error on inactive ost */
97         if (rc && !lov->tgts[req->rq_idx].active)
98                 rc = 0;
99
100         /* FIXME in raid1 regime, should return 0 */
101         RETURN(rc);
102 }
103
104 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
105 {
106         list_add_tail(&req->rq_link, &set->set_list);
107         set->set_count++;
108 }
109
110 int lov_update_enqueue_set(struct lov_request_set *set,
111                            struct lov_request *req, int rc, int flags)
112 {
113         struct lustre_handle *lov_lockhp;
114         struct lov_oinfo *loi;
115         ENTRY;
116
117         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
118         loi = &set->set_md->lsm_oinfo[req->rq_stripe];
119
120         /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
121          * It belongs in the OSC, except that the OSC doesn't have
122          * access to the real LOI -- it gets a copy, that we created
123          * above, and that copy can be arbitrarily out of date.
124          *
125          * The LOV API is due for a serious rewriting anyways, and this
126          * can be addressed then. */
127         if (rc == ELDLM_OK) {
128                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
129                 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
130
131                 LASSERT(lock != NULL);
132                 loi->loi_rss = tmp;
133                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
134                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
135                 /* Extend KMS up to the end of this lock and no further
136                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
137                 if (tmp > lock->l_policy_data.l_extent.end)
138                         tmp = lock->l_policy_data.l_extent.end + 1;
139                 if (tmp >= loi->loi_kms) {
140                         LDLM_DEBUG(lock, "lock acquired, setting rss="
141                                    LPU64", kms="LPU64, loi->loi_rss, tmp);
142                         loi->loi_kms = tmp;
143                         loi->loi_kms_valid = 1;
144                 } else {
145                         LDLM_DEBUG(lock, "lock acquired, setting rss="
146                                    LPU64"; leaving kms="LPU64", end="LPU64,
147                                    loi->loi_rss, loi->loi_kms,
148                                    lock->l_policy_data.l_extent.end);
149                 }
150                 ldlm_lock_allow_match(lock);
151                 LDLM_LOCK_PUT(lock);
152         } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
153                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
154                 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
155                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
156                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
157                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
158                        " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
159                 rc = ELDLM_OK;
160         } else {
161                 struct obd_export *exp = set->set_exp;
162                 struct lov_obd *lov = &exp->exp_obd->u.lov;
163
164                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
165                 if (lov->tgts[req->rq_idx].active) {
166                         CERROR("error: enqueue objid "LPX64" subobj "
167                                 LPX64" on OST idx %d: rc = %d\n",
168                                 set->set_md->lsm_object_id, loi->loi_id,
169                                 loi->loi_ost_idx, rc);
170                 } else {
171                         rc = ELDLM_OK;
172                 }
173         }
174         lov_update_set(set, req, rc);
175         RETURN(rc);
176 }
177
178 static int enqueue_done(struct lov_request_set *set, __u32 mode)
179 {
180         struct lov_request *req;
181         struct lustre_handle *lov_lockhp = NULL;
182         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
183         int rc = 0;
184         ENTRY;
185
186         LASSERT(set->set_completes);
187         /* enqueue/match success, just return */
188         if (set->set_completes == set->set_success)
189                 RETURN(0);
190
191         /* cancel enqueued/matched locks */
192         list_for_each_entry(req, &set->set_list, rq_link) {
193                 if (!req->rq_complete || req->rq_rc)
194                         continue;
195
196                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
197                 LASSERT(lov_lockhp);
198                 if (lov_lockhp->cookie == 0)
199                         continue;
200
201                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
202                                 mode, lov_lockhp);
203                 if (rc && lov->tgts[req->rq_idx].active)
204                         CERROR("cancelling obdjid "LPX64" on OST "
205                                "idx %d error: rc = %d\n",
206                                req->rq_md->lsm_object_id, req->rq_idx, rc);
207         }
208         lov_llh_put(set->set_lockh);
209         RETURN(rc);
210 }
211
212 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
213 {
214         int rc = 0;
215         ENTRY;
216
217         if (set == NULL)
218                 RETURN(0);
219         LASSERT(set->set_exp);
220         if (set->set_completes)
221                 rc = enqueue_done(set, mode);
222         else
223                 lov_llh_put(set->set_lockh);
224
225         if (atomic_dec_and_test(&set->set_refcount))
226                 lov_finish_set(set);
227
228         RETURN(rc);
229 }
230
231 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
232                          ldlm_policy_data_t *policy, __u32 mode,
233                          struct lustre_handle *lockh,
234                          struct lov_request_set **reqset)
235 {
236         struct lov_obd *lov = &exp->exp_obd->u.lov;
237         struct lov_request_set *set;
238         int i, rc = 0;
239         struct lov_oinfo *loi;
240         ENTRY;
241
242         OBD_ALLOC(set, sizeof(*set));
243         if (set == NULL)
244                 RETURN(-ENOMEM);
245         lov_init_set(set);
246
247         set->set_exp = exp;
248         set->set_md = lsm;
249         set->set_lockh = lov_llh_new(lsm);
250         if (set->set_lockh == NULL)
251                 GOTO(out_set, rc = -ENOMEM);
252         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
253
254         loi = lsm->lsm_oinfo;
255         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
256                 struct lov_request *req;
257                 obd_off start, end;
258
259                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
260                                            policy->l_extent.end, &start, &end))
261                         continue;
262
263                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
264                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
265                         continue;
266                 }
267
268                 OBD_ALLOC(req, sizeof(*req));
269                 if (req == NULL)
270                         GOTO(out_set, rc = -ENOMEM);
271
272                 req->rq_buflen = sizeof(*req->rq_md) +
273                         sizeof(struct lov_oinfo);
274                 OBD_ALLOC(req->rq_md, req->rq_buflen);
275                 if (req->rq_md == NULL)
276                         GOTO(out_set, rc = -ENOMEM);
277
278                 req->rq_extent.start = start;
279                 req->rq_extent.end = end;
280
281                 req->rq_idx = loi->loi_ost_idx;
282                 req->rq_stripe = i;
283
284                 /* XXX LOV STACKING: submd should be from the subobj */
285                 req->rq_md->lsm_object_id = loi->loi_id;
286                 req->rq_md->lsm_stripe_count = 0;
287                 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
288                 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
289                 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
290                 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
291                 req->rq_md->lsm_oinfo->loi_mtime = loi->loi_mtime;
292
293                 lov_set_add_req(req, set);
294         }
295         if (!set->set_count)
296                 GOTO(out_set, rc = -EIO);
297         *reqset = set;
298         RETURN(0);
299 out_set:
300         lov_fini_enqueue_set(set, mode);
301         RETURN(rc);
302 }
303
304 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
305                          int rc)
306 {
307         int ret = rc;
308         ENTRY;
309
310         if (rc == 1)
311                 ret = 0;
312         lov_update_set(set, req, ret);
313         RETURN(rc);
314 }
315
316 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
317 {
318         int rc = 0;
319         ENTRY;
320
321         if (set == NULL)
322                 RETURN(0);
323         LASSERT(set->set_exp);
324         if (set->set_completes) {
325                 if (set->set_count == set->set_success &&
326                     flags & LDLM_FL_TEST_LOCK)
327                         lov_llh_put(set->set_lockh);
328                 rc = enqueue_done(set, mode);
329         } else {
330                 lov_llh_put(set->set_lockh);
331         }
332
333         if (atomic_dec_and_test(&set->set_refcount))
334                 lov_finish_set(set);
335
336         RETURN(rc);
337 }
338
339 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
340                        ldlm_policy_data_t *policy, __u32 mode,
341                        struct lustre_handle *lockh,
342                        struct lov_request_set **reqset)
343 {
344         struct lov_obd *lov = &exp->exp_obd->u.lov;
345         struct lov_request_set *set;
346         int i, rc = 0;
347         struct lov_oinfo *loi;
348         ENTRY;
349
350         OBD_ALLOC(set, sizeof(*set));
351         if (set == NULL)
352                 RETURN(-ENOMEM);
353         lov_init_set(set);
354
355         set->set_exp = exp;
356         set->set_md = lsm;
357         set->set_lockh = lov_llh_new(lsm);
358         if (set->set_lockh == NULL)
359                 GOTO(out_set, rc = -ENOMEM);
360         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
361
362         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
363                 struct lov_request *req;
364                 obd_off start, end;
365
366                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
367                                            policy->l_extent.end, &start, &end))
368                         continue;
369
370                 /* FIXME raid1 should grace this error */
371                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
372                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
373                         GOTO(out_set, rc = -EIO);
374                 }
375
376                 OBD_ALLOC(req, sizeof(*req));
377                 if (req == NULL)
378                         GOTO(out_set, rc = -ENOMEM);
379
380                 req->rq_buflen = sizeof(*req->rq_md);
381                 OBD_ALLOC(req->rq_md, req->rq_buflen);
382                 if (req->rq_md == NULL)
383                         GOTO(out_set, rc = -ENOMEM);
384
385                 req->rq_extent.start = start;
386                 req->rq_extent.end = end;
387
388                 req->rq_idx = loi->loi_ost_idx;
389                 req->rq_stripe = i;
390
391                 /* XXX LOV STACKING: submd should be from the subobj */
392                 req->rq_md->lsm_object_id = loi->loi_id;
393                 req->rq_md->lsm_stripe_count = 0;
394
395                 lov_set_add_req(req, set);
396         }
397         if (!set->set_count)
398                 GOTO(out_set, rc = -EIO);
399         *reqset = set;
400         RETURN(rc);
401 out_set:
402         lov_fini_match_set(set, mode, 0);
403         RETURN(rc);
404 }
405
406 int lov_fini_cancel_set(struct lov_request_set *set)
407 {
408         int rc = 0;
409         ENTRY;
410
411         LASSERT(set->set_exp);
412         if (set == NULL)
413                 RETURN(0);
414
415         if (set->set_lockh)
416                 lov_llh_put(set->set_lockh);
417
418         if (atomic_dec_and_test(&set->set_refcount))
419                 lov_finish_set(set);
420
421         RETURN(rc);
422 }
423
424 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
425                         __u32 mode, struct lustre_handle *lockh,
426                         struct lov_request_set **reqset)
427 {
428         struct lov_request_set *set;
429         int i, rc = 0;
430         struct lov_oinfo *loi;
431         ENTRY;
432
433         OBD_ALLOC(set, sizeof(*set));
434         if (set == NULL)
435                 RETURN(-ENOMEM);
436         lov_init_set(set);
437
438         set->set_exp = exp;
439         set->set_md = lsm;
440         set->set_lockh = lov_handle2llh(lockh);
441         if (set->set_lockh == NULL) {
442                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
443                 GOTO(out_set, rc = -EINVAL);
444         }
445         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
446
447         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
448                 struct lov_request *req;
449                 struct lustre_handle *lov_lockhp;
450
451                 lov_lockhp = set->set_lockh->llh_handles + i;
452                 if (lov_lockhp->cookie == 0) {
453                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
454                                loi->loi_ost_idx, loi->loi_id);
455                         continue;
456                 }
457
458                 OBD_ALLOC(req, sizeof(*req));
459                 if (req == NULL)
460                         GOTO(out_set, rc = -ENOMEM);
461
462                 req->rq_buflen = sizeof(*req->rq_md);
463                 OBD_ALLOC(req->rq_md, req->rq_buflen);
464                 if (req->rq_md == NULL)
465                         GOTO(out_set, rc = -ENOMEM);
466
467                 req->rq_idx = loi->loi_ost_idx;
468                 req->rq_stripe = i;
469
470                 /* XXX LOV STACKING: submd should be from the subobj */
471                 req->rq_md->lsm_object_id = loi->loi_id;
472                 req->rq_md->lsm_stripe_count = 0;
473
474                 lov_set_add_req(req, set);
475         }
476         if (!set->set_count)
477                 GOTO(out_set, rc = -EIO);
478         *reqset = set;
479         RETURN(rc);
480 out_set:
481         lov_fini_cancel_set(set);
482         RETURN(rc);
483 }
484
485 static int create_done(struct obd_export *exp, struct lov_request_set *set,
486                        struct lov_stripe_md **lsmp)
487 {
488         struct lov_obd *lov = &exp->exp_obd->u.lov;
489         struct obd_trans_info *oti = set->set_oti;
490         struct obdo *src_oa = set->set_oa;
491         struct lov_request *req;
492         struct obdo *ret_oa = NULL;
493         int attrset = 0, rc = 0;
494         ENTRY;
495
496         LASSERT(set->set_completes);
497
498         if (!set->set_success)
499                 GOTO(cleanup, rc = -EIO);
500         if (*lsmp == NULL && set->set_count != set->set_success) {
501                 set->set_count = set->set_success;
502                 qos_shrink_lsm(set);
503         }
504
505         ret_oa = obdo_alloc();
506         if (ret_oa == NULL)
507                 GOTO(cleanup, rc = -ENOMEM);
508
509         list_for_each_entry(req, &set->set_list, rq_link) {
510                 if (!req->rq_complete || req->rq_rc)
511                         continue;
512                 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
513                                 set->set_md, req->rq_stripe, &attrset);
514         }
515         if (src_oa->o_valid & OBD_MD_FLSIZE &&
516             ret_oa->o_size != src_oa->o_size) {
517                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
518                        src_oa->o_size, ret_oa->o_size);
519                 LBUG();
520         }
521         ret_oa->o_id = src_oa->o_id;
522         memcpy(src_oa, ret_oa, sizeof(*src_oa));
523         obdo_free(ret_oa);
524
525         *lsmp = set->set_md;
526         GOTO(done, rc = 0);
527
528 cleanup:
529         list_for_each_entry(req, &set->set_list, rq_link) {
530                 struct obd_export *sub_exp;
531                 int err = 0;
532
533                 if (!req->rq_complete || req->rq_rc)
534                         continue;
535
536                 sub_exp = lov->tgts[req->rq_idx].ltd_exp,
537                 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
538                 if (err)
539                         CERROR("Failed to uncreate objid "LPX64" subobj "
540                                LPX64" on OST idx %d: rc = %d\n",
541                                set->set_oa->o_id, req->rq_oa->o_id,
542                                req->rq_idx, rc);
543         }
544         if (*lsmp == NULL)
545                 obd_free_memmd(exp, &set->set_md);
546 done:
547         if (oti && set->set_cookies) {
548                 oti->oti_logcookies = set->set_cookies;
549                 if (!set->set_cookie_sent) {
550                         oti_free_cookies(oti);
551                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
552                 } else {
553                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
554                 }
555         }
556         RETURN(rc);
557 }
558
559 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
560 {
561         int rc = 0;
562         ENTRY;
563
564         LASSERT(set->set_exp);
565         if (set == NULL)
566                 RETURN(0);
567         if (set->set_completes) {
568                 rc = create_done(set->set_exp, set, lsmp);
569                 /* FIXME update qos data here */
570         }
571
572         if (atomic_dec_and_test(&set->set_refcount))
573                 lov_finish_set(set);
574
575         RETURN(rc);
576 }
577
578 int lov_update_create_set(struct lov_request_set *set,
579                           struct lov_request *req, int rc)
580 {
581         struct obd_trans_info *oti = set->set_oti;
582         struct lov_stripe_md *lsm = set->set_md;
583         struct lov_oinfo *loi;
584         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
585         ENTRY;
586
587         req->rq_stripe = set->set_success;
588         loi = &lsm->lsm_oinfo[req->rq_stripe];
589
590         if (rc && lov->tgts[req->rq_idx].active) {
591                 CERROR("error creating objid "LPX64" sub-object"
592                        " on OST idx %d/%d: rc = %d\n",
593                        set->set_oa->o_id, req->rq_idx,
594                        lsm->lsm_stripe_count, rc);
595                 if (rc > 0) {
596                         CERROR("obd_create returned invalid err %d\n", rc);
597                         rc = -EIO;
598                 }
599         }
600         lov_update_set(set, req, rc);
601         if (rc)
602                 RETURN(rc);
603
604         if (oti && oti->oti_objid)
605                 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
606
607         loi->loi_id = req->rq_oa->o_id;
608         loi->loi_ost_idx = req->rq_idx;
609         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
610                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
611         loi_init(loi);
612
613         if (set->set_cookies)
614                 ++oti->oti_logcookies;
615         if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
616                 set->set_cookie_sent++;
617
618         RETURN(0);
619 }
620
621 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **lsmp,
622                         struct obdo *src_oa, struct obd_trans_info *oti,
623                         struct lov_request_set **reqset)
624 {
625         struct lov_obd *lov = &exp->exp_obd->u.lov;
626         struct lov_request_set *set;
627         int rc = 0, newea = 0;
628         ENTRY;
629
630         OBD_ALLOC(set, sizeof(*set));
631         if (set == NULL)
632                 RETURN(-ENOMEM);
633         lov_init_set(set);
634
635         set->set_exp = exp;
636         set->set_md = *lsmp;
637         set->set_oa = src_oa;
638         set->set_oti = oti;
639
640         if (set->set_md == NULL) {
641                 int stripes, stripe_cnt;
642                 stripe_cnt = lov_get_stripecnt(lov, 0);
643
644                 /* If the MDS file was truncated up to some size, stripe over
645                  * enough OSTs to allow the file to be created at that size. */
646                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
647                         stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
648                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
649
650                         if (stripes > lov->desc.ld_active_tgt_count)
651                                 GOTO(out_set, rc = -EFBIG);
652                         if (stripes < stripe_cnt)
653                                 stripes = stripe_cnt;
654                 } else {
655                         stripes = stripe_cnt;
656                 }
657
658                 rc = lov_alloc_memmd(&set->set_md, stripes,
659                                      lov->desc.ld_pattern ?
660                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0);
661                 if (rc < 0)
662                         goto out_set;
663                 newea = 1;
664         }
665
666         rc = qos_prep_create(lov, set, newea);
667         if (rc)
668                 goto out_lsm;
669
670         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
671                 oti_alloc_cookies(oti, set->set_count);
672                 if (!oti->oti_logcookies)
673                         goto out_lsm;
674                 set->set_cookies = oti->oti_logcookies;
675         }
676         *reqset = set;
677         RETURN(rc);
678
679 out_lsm:
680         if (*lsmp == NULL)
681                 obd_free_memmd(exp, &set->set_md);
682 out_set:
683         lov_fini_create_set(set, lsmp);
684         RETURN(rc);
685 }
686
687 static int common_attr_done(struct lov_request_set *set)
688 {
689         struct list_head *pos;
690         struct lov_request *req;
691         struct obdo *tmp_oa;
692         int rc = 0, attrset = 0;
693         ENTRY;
694
695         if (set->set_oa == NULL)
696                 RETURN(0);
697
698         if (!set->set_success)
699                 RETURN(-EIO);
700
701         tmp_oa = obdo_alloc();
702         if (tmp_oa == NULL)
703                 GOTO(out, rc = -ENOMEM);
704
705         list_for_each (pos, &set->set_list) {
706                 req = list_entry(pos, struct lov_request, rq_link);
707
708                 if (!req->rq_complete || req->rq_rc)
709                         continue;
710                 if (req->rq_oa->o_valid == 0)   /* inactive stripe */
711                         continue;
712                 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
713                                 set->set_md, req->rq_stripe, &attrset);
714         }
715         if (!attrset) {
716                 CERROR("No stripes had valid attrs\n");
717                 rc = -EIO;
718         }
719         tmp_oa->o_id = set->set_oa->o_id;
720         memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
721 out:
722         if (tmp_oa)
723                 obdo_free(tmp_oa);
724         RETURN(rc);
725
726 }
727
728 static int brw_done(struct lov_request_set *set)
729 {
730         struct lov_stripe_md *lsm = set->set_md;
731         struct lov_oinfo     *loi = NULL;
732         struct list_head *pos;
733         struct lov_request *req;
734         ENTRY;
735
736         list_for_each (pos, &set->set_list) {
737                 req = list_entry(pos, struct lov_request, rq_link);
738
739                 if (!req->rq_complete || req->rq_rc)
740                         continue;
741
742                 loi = &lsm->lsm_oinfo[req->rq_stripe];
743
744                 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
745                         loi->loi_blocks = req->rq_oa->o_blocks;
746         }
747
748         RETURN(0);
749 }
750
751 int lov_fini_brw_set(struct lov_request_set *set)
752 {
753         int rc = 0;
754         ENTRY;
755
756         LASSERT(set->set_exp);
757         if (set == NULL)
758                 RETURN(0);
759         if (set->set_completes) {
760                 rc = brw_done(set);
761                 /* FIXME update qos data here */
762         }
763         if (atomic_dec_and_test(&set->set_refcount))
764                 lov_finish_set(set);
765
766         RETURN(rc);
767 }
768
769 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
770                      struct lov_stripe_md *lsm, obd_count oa_bufs,
771                      struct brw_page *pga, struct obd_trans_info *oti,
772                      struct lov_request_set **reqset)
773 {
774         struct {
775                 obd_count       index;
776                 obd_count       count;
777                 obd_count       off;
778         } *info = NULL;
779         struct lov_request_set *set;
780         struct lov_oinfo *loi = NULL;
781         struct lov_obd *lov = &exp->exp_obd->u.lov;
782         int rc = 0, i, shift;
783         ENTRY;
784
785         OBD_ALLOC(set, sizeof(*set));
786         if (set == NULL)
787                 RETURN(-ENOMEM);
788         lov_init_set(set);
789
790         set->set_exp = exp;
791         set->set_md = lsm;
792         set->set_oa = src_oa;
793         set->set_oti = oti;
794         set->set_oabufs = oa_bufs;
795         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
796         if (!set->set_pga)
797                 GOTO(out, rc = -ENOMEM);
798
799         OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
800         if (!info)
801                 GOTO(out, rc = -ENOMEM);
802
803         /* calculate the page count for each stripe */
804         for (i = 0; i < oa_bufs; i++) {
805                 int stripe = lov_stripe_number(lsm, pga[i].off);
806                 info[stripe].count++;
807         }
808
809         /* alloc and initialize lov request */
810         shift = 0;
811         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
812                 struct lov_request *req;
813
814                 if (info[i].count == 0)
815                         continue;
816
817                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
818                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
819                         GOTO(out, rc = -EIO);
820                 }
821
822                 OBD_ALLOC(req, sizeof(*req));
823                 if (req == NULL)
824                         GOTO(out, rc = -ENOMEM);
825
826                 req->rq_oa = obdo_alloc();
827                 if (req->rq_oa == NULL)
828                         GOTO(out, rc = -ENOMEM);
829
830                 if (src_oa)
831                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
832                 req->rq_oa->o_id = loi->loi_id;
833
834                 req->rq_buflen = sizeof(*req->rq_md);
835                 OBD_ALLOC(req->rq_md, req->rq_buflen);
836                 if (req->rq_md == NULL)
837                         GOTO(out, rc = -ENOMEM);
838
839                 req->rq_idx = loi->loi_ost_idx;
840                 req->rq_stripe = i;
841
842                 /* XXX LOV STACKING */
843                 req->rq_md->lsm_object_id = loi->loi_id;
844                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
845                 req->rq_oabufs = info[i].count;
846                 req->rq_pgaidx = shift;
847                 shift += req->rq_oabufs;
848
849                 /* remember the index for sort brw_page array */
850                 info[i].index = req->rq_pgaidx;
851
852                 lov_set_add_req(req, set);
853         }
854         if (!set->set_count)
855                 GOTO(out, rc = -EIO);
856
857         /* rotate & sort the brw_page array */
858         for (i = 0; i < oa_bufs; i++) {
859                 int stripe = lov_stripe_number(lsm, pga[i].off);
860
861                 shift = info[stripe].index + info[stripe].off;
862                 LASSERT(shift < oa_bufs);
863                 set->set_pga[shift] = pga[i];
864                 lov_stripe_offset(lsm, pga[i].off, stripe,
865                                   &set->set_pga[shift].off);
866                 info[stripe].off++;
867         }
868 out:
869         if (info)
870                 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
871
872         if (rc == 0)
873                 *reqset = set;
874         else
875                 lov_fini_brw_set(set);
876
877         RETURN(rc);
878 }
879
880 int lov_fini_getattr_set(struct lov_request_set *set)
881 {
882         int rc = 0;
883         ENTRY;
884
885         LASSERT(set->set_exp);
886         if (set == NULL)
887                 RETURN(0);
888         if (set->set_completes)
889                 rc = common_attr_done(set);
890
891         if (atomic_dec_and_test(&set->set_refcount))
892                 lov_finish_set(set);
893
894         RETURN(rc);
895 }
896
897 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
898                          struct lov_stripe_md *lsm,
899                          struct lov_request_set **reqset)
900 {
901         struct lov_request_set *set;
902         struct lov_oinfo *loi = NULL;
903         struct lov_obd *lov = &exp->exp_obd->u.lov;
904         int rc = 0, i;
905         ENTRY;
906
907         OBD_ALLOC(set, sizeof(*set));
908         if (set == NULL)
909                 RETURN(-ENOMEM);
910         lov_init_set(set);
911
912         set->set_exp = exp;
913         set->set_md = lsm;
914         set->set_oa = src_oa;
915
916         loi = lsm->lsm_oinfo;
917         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
918                 struct lov_request *req;
919
920                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
921                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
922                         continue;
923                 }
924
925                 OBD_ALLOC(req, sizeof(*req));
926                 if (req == NULL)
927                         GOTO(out_set, rc = -ENOMEM);
928
929                 req->rq_stripe = i;
930                 req->rq_idx = loi->loi_ost_idx;
931
932                 req->rq_oa = obdo_alloc();
933                 if (req->rq_oa == NULL)
934                         GOTO(out_set, rc = -ENOMEM);
935                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
936                 req->rq_oa->o_id = loi->loi_id;
937
938                 lov_set_add_req(req, set);
939         }
940         if (!set->set_count)
941                 GOTO(out_set, rc = -EIO);
942         *reqset = set;
943         RETURN(rc);
944 out_set:
945         lov_fini_getattr_set(set);
946         RETURN(rc);
947 }
948
949 int lov_fini_destroy_set(struct lov_request_set *set)
950 {
951         ENTRY;
952
953         LASSERT(set->set_exp);
954         if (set == NULL)
955                 RETURN(0);
956         if (set->set_completes) {
957                 /* FIXME update qos data here */
958         }
959
960         if (atomic_dec_and_test(&set->set_refcount))
961                 lov_finish_set(set);
962
963         RETURN(0);
964 }
965
966 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
967                          struct lov_stripe_md *lsm,
968                          struct obd_trans_info *oti,
969                          struct lov_request_set **reqset)
970 {
971         struct lov_request_set *set;
972         struct lov_oinfo *loi = NULL;
973         struct lov_obd *lov = &exp->exp_obd->u.lov;
974         int rc = 0, cookie_set = 0, i;
975         ENTRY;
976
977         OBD_ALLOC(set, sizeof(*set));
978         if (set == NULL)
979                 RETURN(-ENOMEM);
980         lov_init_set(set);
981
982         set->set_exp = exp;
983         set->set_md = lsm;
984         set->set_oa = src_oa;
985         set->set_oti = oti;
986         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
987                 set->set_cookies = oti->oti_logcookies;
988
989         loi = lsm->lsm_oinfo;
990         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
991                 struct lov_request *req;
992
993                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
994                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
995                         continue;
996                 }
997
998                 OBD_ALLOC(req, sizeof(*req));
999                 if (req == NULL)
1000                         GOTO(out_set, rc = -ENOMEM);
1001
1002                 req->rq_stripe = i;
1003                 req->rq_idx = loi->loi_ost_idx;
1004
1005                 req->rq_oa = obdo_alloc();
1006                 if (req->rq_oa == NULL)
1007                         GOTO(out_set, rc = -ENOMEM);
1008                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1009                 req->rq_oa->o_id = loi->loi_id;
1010
1011                 /* Setup the first request's cookie position */
1012                 if (!cookie_set && set->set_cookies) {
1013                         oti->oti_logcookies = set->set_cookies + i;
1014                         cookie_set = 1;
1015                 }
1016                 lov_set_add_req(req, set);
1017         }
1018         if (!set->set_count)
1019                 GOTO(out_set, rc = -EIO);
1020         *reqset = set;
1021         RETURN(rc);
1022 out_set:
1023         lov_fini_destroy_set(set);
1024         RETURN(rc);
1025 }
1026
1027 int lov_fini_setattr_set(struct lov_request_set *set)
1028 {
1029         int rc = 0;
1030         ENTRY;
1031
1032         LASSERT(set->set_exp);
1033         if (set == NULL)
1034                 RETURN(0);
1035         if (set->set_completes) {
1036                 rc = common_attr_done(set);
1037                 /* FIXME update qos data here */
1038         }
1039
1040         if (atomic_dec_and_test(&set->set_refcount))
1041                 lov_finish_set(set);
1042         RETURN(rc);
1043 }
1044
1045 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1046                          struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1047                          struct lov_request_set **reqset)
1048 {
1049         struct lov_request_set *set;
1050         struct lov_oinfo *loi = NULL;
1051         struct lov_obd *lov = &exp->exp_obd->u.lov;
1052         int rc = 0, i;
1053         ENTRY;
1054
1055         OBD_ALLOC(set, sizeof(*set));
1056         if (set == NULL)
1057                 RETURN(-ENOMEM);
1058         lov_init_set(set);
1059
1060         set->set_exp = exp;
1061         set->set_md = lsm;
1062         set->set_oa = src_oa;
1063
1064         loi = lsm->lsm_oinfo;
1065         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1066                 struct lov_request *req;
1067
1068                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1069                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1070                         continue;
1071                 }
1072
1073                 OBD_ALLOC(req, sizeof(*req));
1074                 if (req == NULL)
1075                         GOTO(out_set, rc = -ENOMEM);
1076                 req->rq_stripe = i;
1077                 req->rq_idx = loi->loi_ost_idx;
1078
1079                 req->rq_oa = obdo_alloc();
1080                 if (req->rq_oa == NULL)
1081                         GOTO(out_set, rc = -ENOMEM);
1082                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1083                 req->rq_oa->o_id = loi->loi_id;
1084
1085                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1086                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1087                                               &req->rq_oa->o_size) < 0 &&
1088                             req->rq_oa->o_size)
1089                                 req->rq_oa->o_size--;
1090                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1091                                i, req->rq_oa->o_size, src_oa->o_size);
1092                 }
1093                 lov_set_add_req(req, set);
1094         }
1095         if (!set->set_count)
1096                 GOTO(out_set, rc = -EIO);
1097         *reqset = set;
1098         RETURN(rc);
1099 out_set:
1100         lov_fini_setattr_set(set);
1101         RETURN(rc);
1102 }
1103
1104 int lov_update_setattr_set(struct lov_request_set *set,
1105                            struct lov_request *req, int rc)
1106 {
1107         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1108         ENTRY;
1109
1110         lov_update_set(set, req, rc);
1111
1112         /* grace error on inactive ost */
1113         if (rc && !lov->tgts[req->rq_idx].active)
1114                 rc = 0;
1115
1116         /* FIXME: LOV STACKING update loi data should be done by OSC *
1117          * when this is gone we can go back to using lov_update_common_set() */
1118         if (rc == 0 && req->rq_oa->o_valid & OBD_MD_FLMTIME)
1119                 set->set_md->lsm_oinfo[req->rq_stripe].loi_mtime =
1120                         req->rq_oa->o_mtime;
1121         /* ditto loi_atime, loi_ctime when available */
1122
1123         RETURN(rc);
1124 }
1125
1126 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1127                          int rc)
1128 {
1129         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1130         ENTRY;
1131
1132         lov_update_set(set, req, rc);
1133         if (rc && !lov->tgts[req->rq_idx].active)
1134                 rc = 0;
1135         /* FIXME in raid1 regime, should return 0 */
1136         RETURN(rc);
1137 }
1138
1139 int lov_fini_punch_set(struct lov_request_set *set)
1140 {
1141         int rc = 0;
1142         ENTRY;
1143
1144         LASSERT(set->set_exp);
1145         if (set == NULL)
1146                 RETURN(0);
1147         if (set->set_completes) {
1148                 if (!set->set_success)
1149                         rc = -EIO;
1150                 /* FIXME update qos data here */
1151         }
1152
1153         if (atomic_dec_and_test(&set->set_refcount))
1154                 lov_finish_set(set);
1155
1156         RETURN(rc);
1157 }
1158
1159 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1160                        struct lov_stripe_md *lsm, obd_off start,
1161                        obd_off end, struct obd_trans_info *oti,
1162                        struct lov_request_set **reqset)
1163 {
1164         struct lov_request_set *set;
1165         struct lov_oinfo *loi = NULL;
1166         struct lov_obd *lov = &exp->exp_obd->u.lov;
1167         int rc = 0, i;
1168         ENTRY;
1169
1170         OBD_ALLOC(set, sizeof(*set));
1171         if (set == NULL)
1172                 RETURN(-ENOMEM);
1173         lov_init_set(set);
1174
1175         set->set_exp = exp;
1176         set->set_md = lsm;
1177         set->set_oa = src_oa;
1178
1179         loi = lsm->lsm_oinfo;
1180         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1181                 struct lov_request *req;
1182                 obd_off rs, re;
1183
1184                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1185                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1186                         continue;
1187                 }
1188
1189                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1190                         continue;
1191
1192                 OBD_ALLOC(req, sizeof(*req));
1193                 if (req == NULL)
1194                         GOTO(out_set, rc = -ENOMEM);
1195                 req->rq_stripe = i;
1196                 req->rq_idx = loi->loi_ost_idx;
1197
1198                 req->rq_oa = obdo_alloc();
1199                 if (req->rq_oa == NULL)
1200                         GOTO(out_set, rc = -ENOMEM);
1201                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1202                 req->rq_oa->o_id = loi->loi_id;
1203
1204                 req->rq_extent.start = rs;
1205                 req->rq_extent.end = re;
1206
1207                 lov_set_add_req(req, set);
1208         }
1209         if (!set->set_count)
1210                 GOTO(out_set, rc = -EIO);
1211         *reqset = set;
1212         RETURN(rc);
1213 out_set:
1214         lov_fini_punch_set(set);
1215         RETURN(rc);
1216 }
1217
1218 int lov_fini_sync_set(struct lov_request_set *set)
1219 {
1220         int rc = 0;
1221         ENTRY;
1222
1223         LASSERT(set->set_exp);
1224         if (set == NULL)
1225                 RETURN(0);
1226         if (set->set_completes) {
1227                 if (!set->set_success)
1228                         rc = -EIO;
1229                 /* FIXME update qos data here */
1230         }
1231
1232         if (atomic_dec_and_test(&set->set_refcount))
1233                 lov_finish_set(set);
1234
1235         RETURN(rc);
1236 }
1237
1238 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1239                       struct lov_stripe_md *lsm, obd_off start,
1240                       obd_off end, struct lov_request_set **reqset)
1241 {
1242         struct lov_request_set *set;
1243         struct lov_oinfo *loi = NULL;
1244         struct lov_obd *lov = &exp->exp_obd->u.lov;
1245         int rc = 0, i;
1246         ENTRY;
1247
1248         OBD_ALLOC(set, sizeof(*set));
1249         if (set == NULL)
1250                 RETURN(-ENOMEM);
1251         lov_init_set(set);
1252
1253         set->set_exp = exp;
1254         set->set_md = lsm;
1255         set->set_oa = src_oa;
1256
1257         loi = lsm->lsm_oinfo;
1258         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1259                 struct lov_request *req;
1260                 obd_off rs, re;
1261
1262                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1263                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1264                         continue;
1265                 }
1266
1267                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1268                         continue;
1269
1270                 OBD_ALLOC(req, sizeof(*req));
1271                 if (req == NULL)
1272                         GOTO(out_set, rc = -ENOMEM);
1273                 req->rq_stripe = i;
1274                 req->rq_idx = loi->loi_ost_idx;
1275
1276                 req->rq_oa = obdo_alloc();
1277                 if (req->rq_oa == NULL)
1278                         GOTO(out_set, rc = -ENOMEM);
1279                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1280                 req->rq_oa->o_id = loi->loi_id;
1281
1282                 req->rq_extent.start = rs;
1283                 req->rq_extent.end = re;
1284
1285                 lov_set_add_req(req, set);
1286         }
1287         if (!set->set_count)
1288                 GOTO(out_set, rc = -EIO);
1289         *reqset = set;
1290         RETURN(rc);
1291 out_set:
1292         lov_fini_sync_set(set);
1293         RETURN(rc);
1294 }