Whamcloud - gitweb
- merge with 1_5,some fixes.
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         obdo_free(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = &set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] && 
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         CERROR("error: enqueue objid "LPX64" subobj "
180                                 LPX64" on OST idx %d: rc = %d\n",
181                                 set->set_oi->oi_md->lsm_object_id,
182                                 loi->loi_id, loi->loi_ost_idx, rc);
183                 } else {
184                         rc = ELDLM_OK;
185                 }
186         }
187         lov_update_set(set, req, rc);
188         RETURN(rc);
189 }
190
191 /* The callback for osc_enqueue that updates lov info for every OSC request. */
192 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
193 {
194         struct obd_enqueue_info *einfo;
195         struct lov_request *lovreq;
196
197         lovreq = container_of(oinfo, struct lov_request, rq_oi);
198         einfo = lovreq->rq_rqset->set_ei;
199         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
200 }
201
202 static int enqueue_done(struct lov_request_set *set, __u32 mode)
203 {
204         struct lov_request *req;
205         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
206         int rc = 0;
207         ENTRY;
208
209         LASSERT(set->set_completes);
210         /* enqueue/match success, just return */
211         if (set->set_completes == set->set_success)
212                 RETURN(0);
213
214         /* cancel enqueued/matched locks */
215         list_for_each_entry(req, &set->set_list, rq_link) {
216                 struct lustre_handle *lov_lockhp;
217
218                 if (!req->rq_complete || req->rq_rc)
219                         continue;
220
221                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
222                 LASSERT(lov_lockhp);
223                 if (!lustre_handle_is_used(lov_lockhp))
224                         continue;
225
226                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
227                                 req->rq_oi.oi_md, mode, lov_lockhp);
228                 if (rc && lov->lov_tgts[req->rq_idx] &&
229                     lov->lov_tgts[req->rq_idx]->ltd_active)
230                         CERROR("cancelling obdjid "LPX64" on OST "
231                                "idx %d error: rc = %d\n",
232                                req->rq_oi.oi_md->lsm_object_id,
233                                req->rq_idx, rc);
234         }
235         lov_llh_put(set->set_lockh);
236         RETURN(rc);
237 }
238
239 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
240 {
241         int rc = 0;
242         ENTRY;
243
244         if (set == NULL)
245                 RETURN(0);
246         LASSERT(set->set_exp);
247         /* Do enqueue_done only for sync requests and if any request
248            succeeded. */
249         if (!set->set_ei->ei_rqset && set->set_completes)
250                 rc = enqueue_done(set, mode);
251         else
252                 lov_llh_put(set->set_lockh);
253
254         if (atomic_dec_and_test(&set->set_refcount))
255                 lov_finish_set(set);
256
257         RETURN(rc);
258 }
259
260 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
261                          struct obd_enqueue_info *einfo,
262                          struct lov_request_set **reqset)
263 {
264         struct lov_obd *lov = &exp->exp_obd->u.lov;
265         struct lov_request_set *set;
266         int i, rc = 0;
267         struct lov_oinfo *loi;
268         ENTRY;
269
270         OBD_ALLOC(set, sizeof(*set));
271         if (set == NULL)
272                 RETURN(-ENOMEM);
273         lov_init_set(set);
274
275         set->set_exp = exp;
276         set->set_oi = oinfo;
277         set->set_ei = einfo;
278         set->set_lockh = lov_llh_new(oinfo->oi_md);
279         if (set->set_lockh == NULL)
280                 GOTO(out_set, rc = -ENOMEM);
281         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
282
283         loi = oinfo->oi_md->lsm_oinfo;
284         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
285                 struct lov_request *req;
286                 obd_off start, end;
287
288                 if (!lov_stripe_intersects(oinfo->oi_md, i,
289                                            oinfo->oi_policy.l_extent.start,
290                                            oinfo->oi_policy.l_extent.end,
291                                            &start, &end))
292                         continue;
293
294                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
295                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
296                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
297                         continue;
298                 }
299
300                 OBD_ALLOC(req, sizeof(*req));
301                 if (req == NULL)
302                         GOTO(out_set, rc = -ENOMEM);
303
304                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
305                         sizeof(struct lov_oinfo);
306                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
307                 if (req->rq_oi.oi_md == NULL) {
308                         OBD_FREE(req, sizeof(*req));
309                         GOTO(out_set, rc = -ENOMEM);
310                 }
311
312                 req->rq_rqset = set;
313                 /* Set lov request specific parameters. */
314                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
315                 req->rq_oi.oi_cb_up = cb_update_enqueue;
316
317                 LASSERT(req->rq_oi.oi_lockh);
318
319                 req->rq_oi.oi_policy.l_extent.gid =
320                         oinfo->oi_policy.l_extent.gid;
321                 req->rq_oi.oi_policy.l_extent.start = start;
322                 req->rq_oi.oi_policy.l_extent.end = end;
323
324                 req->rq_idx = loi->loi_ost_idx;
325                 req->rq_stripe = i;
326
327                 /* XXX LOV STACKING: submd should be from the subobj */
328                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
329                 req->rq_oi.oi_md->lsm_stripe_count = 0;
330                 req->rq_oi.oi_md->lsm_oinfo->loi_kms_valid =
331                         loi->loi_kms_valid;
332                 req->rq_oi.oi_md->lsm_oinfo->loi_kms = loi->loi_kms;
333                 req->rq_oi.oi_md->lsm_oinfo->loi_lvb = loi->loi_lvb;
334
335                 lov_set_add_req(req, set);
336         }
337         if (!set->set_count)
338                 GOTO(out_set, rc = -EIO);
339         *reqset = set;
340         RETURN(0);
341 out_set:
342         lov_fini_enqueue_set(set, einfo->ei_mode);
343         RETURN(rc);
344 }
345
346 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
347                          int rc)
348 {
349         int ret = rc;
350         ENTRY;
351
352         if (rc == 1)
353                 ret = 0;
354         lov_update_set(set, req, ret);
355         RETURN(rc);
356 }
357
358 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
359 {
360         int rc = 0;
361         ENTRY;
362
363         if (set == NULL)
364                 RETURN(0);
365         LASSERT(set->set_exp);
366         if (set->set_completes) {
367                 if (set->set_count == set->set_success &&
368                     flags & LDLM_FL_TEST_LOCK)
369                         lov_llh_put(set->set_lockh);
370                 rc = enqueue_done(set, mode);
371         } else {
372                 lov_llh_put(set->set_lockh);
373         }
374
375         if (atomic_dec_and_test(&set->set_refcount))
376                 lov_finish_set(set);
377
378         RETURN(rc);
379 }
380
381 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
382                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
383                        __u32 mode, struct lustre_handle *lockh,
384                        struct lov_request_set **reqset)
385 {
386         struct lov_obd *lov = &exp->exp_obd->u.lov;
387         struct lov_request_set *set;
388         int i, rc = 0;
389         struct lov_oinfo *loi;
390         ENTRY;
391
392         OBD_ALLOC(set, sizeof(*set));
393         if (set == NULL)
394                 RETURN(-ENOMEM);
395         lov_init_set(set);
396
397         set->set_exp = exp;
398         set->set_oi = oinfo;
399         set->set_oi->oi_md = lsm;
400         set->set_lockh = lov_llh_new(lsm);
401         if (set->set_lockh == NULL)
402                 GOTO(out_set, rc = -ENOMEM);
403         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
404
405         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
406                 struct lov_request *req;
407                 obd_off start, end;
408
409                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
410                                            policy->l_extent.end, &start, &end))
411                         continue;
412
413                 /* FIXME raid1 should grace this error */
414                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
415                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
416                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
417                         GOTO(out_set, rc = -EIO);
418                 }
419
420                 OBD_ALLOC(req, sizeof(*req));
421                 if (req == NULL)
422                         GOTO(out_set, rc = -ENOMEM);
423
424                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
425                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
426                 if (req->rq_oi.oi_md == NULL) {
427                         OBD_FREE(req, sizeof(*req));
428                         GOTO(out_set, rc = -ENOMEM);
429                 }
430
431                 req->rq_oi.oi_policy.l_extent.start = start;
432                 req->rq_oi.oi_policy.l_extent.end = end;
433                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
434
435                 req->rq_idx = loi->loi_ost_idx;
436                 req->rq_stripe = i;
437
438                 /* XXX LOV STACKING: submd should be from the subobj */
439                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
440                 req->rq_oi.oi_md->lsm_stripe_count = 0;
441
442                 lov_set_add_req(req, set);
443         }
444         if (!set->set_count)
445                 GOTO(out_set, rc = -EIO);
446         *reqset = set;
447         RETURN(rc);
448 out_set:
449         lov_fini_match_set(set, mode, 0);
450         RETURN(rc);
451 }
452
453 int lov_fini_cancel_set(struct lov_request_set *set)
454 {
455         int rc = 0;
456         ENTRY;
457
458         if (set == NULL)
459                 RETURN(0);
460
461         LASSERT(set->set_exp);
462         if (set->set_lockh)
463                 lov_llh_put(set->set_lockh);
464
465         if (atomic_dec_and_test(&set->set_refcount))
466                 lov_finish_set(set);
467
468         RETURN(rc);
469 }
470
471 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
472                         struct lov_stripe_md *lsm, __u32 mode,
473                         struct lustre_handle *lockh,
474                         struct lov_request_set **reqset)
475 {
476         struct lov_request_set *set;
477         int i, rc = 0;
478         struct lov_oinfo *loi;
479         ENTRY;
480
481         OBD_ALLOC(set, sizeof(*set));
482         if (set == NULL)
483                 RETURN(-ENOMEM);
484         lov_init_set(set);
485
486         set->set_exp = exp;
487         set->set_oi = oinfo;
488         set->set_oi->oi_md = lsm;
489         set->set_lockh = lov_handle2llh(lockh);
490         if (set->set_lockh == NULL) {
491                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
492                 GOTO(out_set, rc = -EINVAL);
493         }
494         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
495
496         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
497                 struct lov_request *req;
498                 struct lustre_handle *lov_lockhp;
499
500                 lov_lockhp = set->set_lockh->llh_handles + i;
501                 if (!lustre_handle_is_used(lov_lockhp)) {
502                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
503                                loi->loi_ost_idx, loi->loi_id);
504                         continue;
505                 }
506
507                 OBD_ALLOC(req, sizeof(*req));
508                 if (req == NULL)
509                         GOTO(out_set, rc = -ENOMEM);
510
511                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
512                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
513                 if (req->rq_oi.oi_md == NULL) {
514                         OBD_FREE(req, sizeof(*req));
515                         GOTO(out_set, rc = -ENOMEM);
516                 }
517
518                 req->rq_idx = loi->loi_ost_idx;
519                 req->rq_stripe = i;
520
521                 /* XXX LOV STACKING: submd should be from the subobj */
522                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
523                 req->rq_oi.oi_md->lsm_stripe_count = 0;
524
525                 lov_set_add_req(req, set);
526         }
527         if (!set->set_count)
528                 GOTO(out_set, rc = -EIO);
529         *reqset = set;
530         RETURN(rc);
531 out_set:
532         lov_fini_cancel_set(set);
533         RETURN(rc);
534 }
535
536 static int create_done(struct obd_export *exp, struct lov_request_set *set,
537                        struct lov_stripe_md **lsmp)
538 {
539         struct lov_obd *lov = &exp->exp_obd->u.lov;
540         struct obd_trans_info *oti = set->set_oti;
541         struct obdo *src_oa = set->set_oi->oi_oa;
542         struct lov_request *req;
543         struct obdo *ret_oa = NULL;
544         int attrset = 0, rc = 0;
545         ENTRY;
546
547         LASSERT(set->set_completes);
548
549         /* try alloc objects on other osts if osc_create fails for
550          * exceptions: RPC failure, ENOSPC, etc */
551         if (set->set_count != set->set_success) {
552                 list_for_each_entry (req, &set->set_list, rq_link) {
553                         if (req->rq_rc == 0)
554                                 continue;
555
556                         set->set_completes--;
557                         req->rq_complete = 0;
558
559                         rc = qos_remedy_create(set, req);
560                         lov_update_create_set(set, req, rc);
561
562                         if (rc)
563                                 break;
564                 }
565         }
566
567         /* no successful creates */
568         if (set->set_success == 0)
569                 GOTO(cleanup, rc);
570
571         /* If there was an explicit stripe set, fail.  Otherwise, we
572          * got some objects and that's not bad. */
573         if (set->set_count != set->set_success) {
574                 if (*lsmp)
575                         GOTO(cleanup, rc);
576                 set->set_count = set->set_success;
577                 qos_shrink_lsm(set);
578         }
579
580         ret_oa = obdo_alloc();
581         if (ret_oa == NULL)
582                 GOTO(cleanup, rc = -ENOMEM);
583
584         list_for_each_entry(req, &set->set_list, rq_link) {
585                 if (!req->rq_complete || req->rq_rc)
586                         continue;
587                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
588                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
589                                 req->rq_stripe, &attrset);
590         }
591         if (src_oa->o_valid & OBD_MD_FLSIZE &&
592             ret_oa->o_size != src_oa->o_size) {
593                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
594                        src_oa->o_size, ret_oa->o_size);
595                 LBUG();
596         }
597         ret_oa->o_id = src_oa->o_id;
598         memcpy(src_oa, ret_oa, sizeof(*src_oa));
599         obdo_free(ret_oa);
600
601         *lsmp = set->set_oi->oi_md;
602         GOTO(done, rc = 0);
603
604 cleanup:
605         list_for_each_entry(req, &set->set_list, rq_link) {
606                 struct obd_export *sub_exp;
607                 int err = 0;
608
609                 if (!req->rq_complete || req->rq_rc)
610                         continue;
611
612                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
613                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
614                 if (err)
615                         CERROR("Failed to uncreate objid "LPX64" subobj "
616                                LPX64" on OST idx %d: rc = %d\n",
617                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
618                                req->rq_idx, rc);
619         }
620         if (*lsmp == NULL)
621                 obd_free_memmd(exp, &set->set_oi->oi_md);
622 done:
623         if (oti && set->set_cookies) {
624                 oti->oti_logcookies = set->set_cookies;
625                 if (!set->set_cookie_sent) {
626                         oti_free_cookies(oti);
627                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
628                 } else {
629                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
630                 }
631         }
632         RETURN(rc);
633 }
634
635 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
636 {
637         int rc = 0;
638         ENTRY;
639
640         if (set == NULL)
641                 RETURN(0);
642         LASSERT(set->set_exp);
643         if (set->set_completes)
644                 rc = create_done(set->set_exp, set, lsmp);
645
646         if (atomic_dec_and_test(&set->set_refcount))
647                 lov_finish_set(set);
648
649         RETURN(rc);
650 }
651
652 int lov_update_create_set(struct lov_request_set *set,
653                           struct lov_request *req, int rc)
654 {
655         struct obd_trans_info *oti = set->set_oti;
656         struct lov_stripe_md *lsm = set->set_oi->oi_md;
657         struct lov_oinfo *loi;
658         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
659         ENTRY;
660
661         req->rq_stripe = set->set_success;
662         loi = &lsm->lsm_oinfo[req->rq_stripe];
663
664         if (rc && lov->lov_tgts[req->rq_idx] &&
665             lov->lov_tgts[req->rq_idx]->ltd_active) {
666                 CERROR("error creating fid "LPX64" sub-object"
667                        " on OST idx %d/%d: rc = %d\n",
668                        set->set_oi->oi_oa->o_id, req->rq_idx,
669                        lsm->lsm_stripe_count, rc);
670                 if (rc > 0) {
671                         CERROR("obd_create returned invalid err %d\n", rc);
672                         rc = -EIO;
673                 }
674         }
675         lov_update_set(set, req, rc);
676         if (rc)
677                 RETURN(rc);
678
679         if (oti && oti->oti_objid)
680                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
681
682         loi->loi_id = req->rq_oi.oi_oa->o_id;
683         loi->loi_ost_idx = req->rq_idx;
684         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
685                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
686         loi_init(loi);
687
688         if (oti && set->set_cookies)
689                 ++oti->oti_logcookies;
690         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
691                 set->set_cookie_sent++;
692
693         RETURN(0);
694 }
695
696 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
697                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
698                         struct obd_trans_info *oti,
699                         struct lov_request_set **reqset)
700 {
701         struct lov_request_set *set;
702         int rc = 0;
703         ENTRY;
704
705         OBD_ALLOC(set, sizeof(*set));
706         if (set == NULL)
707                 RETURN(-ENOMEM);
708         lov_init_set(set);
709
710         set->set_exp = exp;
711         set->set_oi = oinfo;
712         set->set_oi->oi_md = *lsmp;
713         set->set_oi->oi_oa = src_oa;
714         set->set_oti = oti;
715
716         rc = qos_prep_create(exp, set);
717         if (rc)
718                 lov_fini_create_set(set, lsmp);
719         else
720                 *reqset = set;
721         RETURN(rc);
722 }
723
724 static int common_attr_done(struct lov_request_set *set)
725 {
726         struct list_head *pos;
727         struct lov_request *req;
728         struct obdo *tmp_oa;
729         int rc = 0, attrset = 0;
730         ENTRY;
731
732         LASSERT(set->set_oi != NULL);
733
734         if (set->set_oi->oi_oa == NULL)
735                 RETURN(0);
736
737         if (!set->set_success)
738                 RETURN(-EIO);
739
740         tmp_oa = obdo_alloc();
741         if (tmp_oa == NULL)
742                 GOTO(out, rc = -ENOMEM);
743
744         list_for_each (pos, &set->set_list) {
745                 req = list_entry(pos, struct lov_request, rq_link);
746
747                 if (!req->rq_complete || req->rq_rc)
748                         continue;
749                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
750                         continue;
751                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
752                                 req->rq_oi.oi_oa->o_valid,
753                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
754         }
755         if (!attrset) {
756                 CERROR("No stripes had valid attrs\n");
757                 rc = -EIO;
758         }
759         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
760         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
761 out:
762         if (tmp_oa)
763                 obdo_free(tmp_oa);
764         RETURN(rc);
765
766 }
767
768 static int brw_done(struct lov_request_set *set)
769 {
770         struct lov_stripe_md *lsm = set->set_oi->oi_md;
771         struct lov_oinfo     *loi = NULL;
772         struct list_head *pos;
773         struct lov_request *req;
774         ENTRY;
775
776         list_for_each (pos, &set->set_list) {
777                 req = list_entry(pos, struct lov_request, rq_link);
778
779                 if (!req->rq_complete || req->rq_rc)
780                         continue;
781
782                 loi = &lsm->lsm_oinfo[req->rq_stripe];
783
784                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
785                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
786         }
787
788         RETURN(0);
789 }
790
791 int lov_fini_brw_set(struct lov_request_set *set)
792 {
793         int rc = 0;
794         ENTRY;
795
796         if (set == NULL)
797                 RETURN(0);
798         LASSERT(set->set_exp);
799         if (set->set_completes) {
800                 rc = brw_done(set);
801                 /* FIXME update qos data here */
802         }
803         if (atomic_dec_and_test(&set->set_refcount))
804                 lov_finish_set(set);
805
806         RETURN(rc);
807 }
808
809 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
810                      obd_count oa_bufs, struct brw_page *pga,
811                      struct obd_trans_info *oti,
812                      struct lov_request_set **reqset)
813 {
814         struct {
815                 obd_count       index;
816                 obd_count       count;
817                 obd_count       off;
818         } *info = NULL;
819         struct lov_request_set *set;
820         struct lov_oinfo *loi = NULL;
821         struct lov_obd *lov = &exp->exp_obd->u.lov;
822         int rc = 0, i, shift;
823         ENTRY;
824
825         OBD_ALLOC(set, sizeof(*set));
826         if (set == NULL)
827                 RETURN(-ENOMEM);
828         lov_init_set(set);
829
830         set->set_exp = exp;
831         set->set_oti = oti;
832         set->set_oi = oinfo;
833         set->set_oabufs = oa_bufs;
834         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
835         if (!set->set_pga)
836                 GOTO(out, rc = -ENOMEM);
837
838         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
839         if (!info)
840                 GOTO(out, rc = -ENOMEM);
841
842         /* calculate the page count for each stripe */
843         for (i = 0; i < oa_bufs; i++) {
844                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
845                 info[stripe].count++;
846         }
847
848         /* alloc and initialize lov request */
849         shift = 0;
850         for (i = 0, loi = oinfo->oi_md->lsm_oinfo;
851              i < oinfo->oi_md->lsm_stripe_count; i++, loi++){
852                 struct lov_request *req;
853
854                 if (info[i].count == 0)
855                         continue;
856
857                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
858                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
859                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
860                         GOTO(out, rc = -EIO);
861                 }
862
863                 OBD_ALLOC(req, sizeof(*req));
864                 if (req == NULL)
865                         GOTO(out, rc = -ENOMEM);
866
867                 req->rq_oi.oi_oa = obdo_alloc();
868                 if (req->rq_oi.oi_oa == NULL) {
869                         OBD_FREE(req, sizeof(*req));
870                         GOTO(out, rc = -ENOMEM);
871                 }
872
873                 if (oinfo->oi_oa) {
874                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
875                                sizeof(*req->rq_oi.oi_oa));
876                 }
877                 req->rq_oi.oi_oa->o_id = loi->loi_id;
878                 req->rq_oi.oi_oa->o_stripe_idx = i;
879
880                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
881                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
882                 if (req->rq_oi.oi_md == NULL) {
883                         obdo_free(req->rq_oi.oi_oa);
884                         OBD_FREE(req, sizeof(*req));
885                         GOTO(out, rc = -ENOMEM);
886                 }
887
888                 req->rq_idx = loi->loi_ost_idx;
889                 req->rq_stripe = i;
890
891                 /* XXX LOV STACKING */
892                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
893                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
894                 req->rq_oabufs = info[i].count;
895                 req->rq_pgaidx = shift;
896                 shift += req->rq_oabufs;
897
898                 /* remember the index for sort brw_page array */
899                 info[i].index = req->rq_pgaidx;
900
901                 lov_set_add_req(req, set);
902         }
903         if (!set->set_count)
904                 GOTO(out, rc = -EIO);
905
906         /* rotate & sort the brw_page array */
907         for (i = 0; i < oa_bufs; i++) {
908                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
909
910                 shift = info[stripe].index + info[stripe].off;
911                 LASSERT(shift < oa_bufs);
912                 set->set_pga[shift] = pga[i];
913                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
914                                   &set->set_pga[shift].off);
915                 info[stripe].off++;
916         }
917 out:
918         if (info)
919                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
920
921         if (rc == 0)
922                 *reqset = set;
923         else
924                 lov_fini_brw_set(set);
925
926         RETURN(rc);
927 }
928
929 int lov_fini_getattr_set(struct lov_request_set *set)
930 {
931         int rc = 0;
932         ENTRY;
933
934         if (set == NULL)
935                 RETURN(0);
936         LASSERT(set->set_exp);
937         if (set->set_completes)
938                 rc = common_attr_done(set);
939
940         if (atomic_dec_and_test(&set->set_refcount))
941                 lov_finish_set(set);
942
943         RETURN(rc);
944 }
945
946 /* The callback for osc_getattr_async that finilizes a request info when a
947  * response is recieved. */
948 static int cb_getattr_update(struct obd_info *oinfo, int rc)
949 {
950         struct lov_request *lovreq;
951         lovreq = container_of(oinfo, struct lov_request, rq_oi);
952         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
953 }
954
955 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
956                          struct lov_request_set **reqset)
957 {
958         struct lov_request_set *set;
959         struct lov_oinfo *loi = NULL;
960         struct lov_obd *lov = &exp->exp_obd->u.lov;
961         int rc = 0, i;
962         ENTRY;
963
964         OBD_ALLOC(set, sizeof(*set));
965         if (set == NULL)
966                 RETURN(-ENOMEM);
967         lov_init_set(set);
968
969         set->set_exp = exp;
970         set->set_oi = oinfo;
971
972         loi = oinfo->oi_md->lsm_oinfo;
973         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
974                 struct lov_request *req;
975
976                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
977                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
978                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
979                         continue;
980                 }
981
982                 OBD_ALLOC(req, sizeof(*req));
983                 if (req == NULL)
984                         GOTO(out_set, rc = -ENOMEM);
985
986                 req->rq_stripe = i;
987                 req->rq_idx = loi->loi_ost_idx;
988
989                 req->rq_oi.oi_oa = obdo_alloc();
990                 if (req->rq_oi.oi_oa == NULL) {
991                         OBD_FREE(req, sizeof(*req));
992                         GOTO(out_set, rc = -ENOMEM);
993                 }
994                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
995                        sizeof(*req->rq_oi.oi_oa));
996                 req->rq_oi.oi_oa->o_id = loi->loi_id;
997                 req->rq_oi.oi_cb_up = cb_getattr_update;
998
999                 lov_set_add_req(req, set);
1000         }
1001         if (!set->set_count)
1002                 GOTO(out_set, rc = -EIO);
1003         *reqset = set;
1004         RETURN(rc);
1005 out_set:
1006         lov_fini_getattr_set(set);
1007         RETURN(rc);
1008 }
1009
1010 int lov_fini_destroy_set(struct lov_request_set *set)
1011 {
1012         ENTRY;
1013
1014         if (set == NULL)
1015                 RETURN(0);
1016         LASSERT(set->set_exp);
1017         if (set->set_completes) {
1018                 /* FIXME update qos data here */
1019         }
1020
1021         if (atomic_dec_and_test(&set->set_refcount))
1022                 lov_finish_set(set);
1023
1024         RETURN(0);
1025 }
1026
1027 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1028                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1029                          struct obd_trans_info *oti,
1030                          struct lov_request_set **reqset)
1031 {
1032         struct lov_request_set *set;
1033         struct lov_oinfo *loi = NULL;
1034         struct lov_obd *lov = &exp->exp_obd->u.lov;
1035         int rc = 0, i;
1036         ENTRY;
1037
1038         OBD_ALLOC(set, sizeof(*set));
1039         if (set == NULL)
1040                 RETURN(-ENOMEM);
1041         lov_init_set(set);
1042
1043         set->set_exp = exp;
1044         set->set_oi = oinfo;
1045         set->set_oi->oi_md = lsm;
1046         set->set_oi->oi_oa = src_oa;
1047         set->set_oti = oti;
1048         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1049                 set->set_cookies = oti->oti_logcookies;
1050
1051         loi = lsm->lsm_oinfo;
1052         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1053                 struct lov_request *req;
1054
1055                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1056                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1057                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1058                         continue;
1059                 }
1060
1061                 OBD_ALLOC(req, sizeof(*req));
1062                 if (req == NULL)
1063                         GOTO(out_set, rc = -ENOMEM);
1064
1065                 req->rq_stripe = i;
1066                 req->rq_idx = loi->loi_ost_idx;
1067
1068                 req->rq_oi.oi_oa = obdo_alloc();
1069                 if (req->rq_oi.oi_oa == NULL) {
1070                         OBD_FREE(req, sizeof(*req));
1071                         GOTO(out_set, rc = -ENOMEM);
1072                 }
1073                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1074                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1075                 lov_set_add_req(req, set);
1076         }
1077         if (!set->set_count)
1078                 GOTO(out_set, rc = -EIO);
1079         *reqset = set;
1080         RETURN(rc);
1081 out_set:
1082         lov_fini_destroy_set(set);
1083         RETURN(rc);
1084 }
1085
1086 int lov_fini_setattr_set(struct lov_request_set *set)
1087 {
1088         int rc = 0;
1089         ENTRY;
1090
1091         if (set == NULL)
1092                 RETURN(0);
1093         LASSERT(set->set_exp);
1094         if (set->set_completes) {
1095                 rc = common_attr_done(set);
1096                 /* FIXME update qos data here */
1097         }
1098
1099         if (atomic_dec_and_test(&set->set_refcount))
1100                 lov_finish_set(set);
1101         RETURN(rc);
1102 }
1103
1104 int lov_update_setattr_set(struct lov_request_set *set,
1105                            struct lov_request *req, int rc)
1106 {
1107         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1108         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1109         ENTRY;
1110
1111         lov_update_set(set, req, rc);
1112
1113         /* grace error on inactive ost */
1114         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1115                     lov->lov_tgts[req->rq_idx]->ltd_active))
1116                 rc = 0;
1117
1118         if (rc == 0) {
1119                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1120                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_ctime =
1121                                 req->rq_oi.oi_oa->o_ctime;
1122                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1123                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_mtime =
1124                                 req->rq_oi.oi_oa->o_mtime;
1125                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1126                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_atime =
1127                                 req->rq_oi.oi_oa->o_atime;
1128         }
1129
1130         RETURN(rc);
1131 }
1132
1133 /* The callback for osc_setattr_async that finilizes a request info when a
1134  * response is recieved. */
1135 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1136 {
1137         struct lov_request *lovreq;
1138         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1139         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1140 }
1141
1142 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1143                          struct obd_trans_info *oti,
1144                          struct lov_request_set **reqset)
1145 {
1146         struct lov_request_set *set;
1147         struct lov_oinfo *loi = NULL;
1148         struct lov_obd *lov = &exp->exp_obd->u.lov;
1149         int rc = 0, i;
1150         ENTRY;
1151
1152         OBD_ALLOC(set, sizeof(*set));
1153         if (set == NULL)
1154                 RETURN(-ENOMEM);
1155         lov_init_set(set);
1156
1157         set->set_exp = exp;
1158         set->set_oti = oti;
1159         set->set_oi = oinfo;
1160         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1161                 set->set_cookies = oti->oti_logcookies;
1162
1163         loi = oinfo->oi_md->lsm_oinfo;
1164         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1165                 struct lov_request *req;
1166
1167                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1168                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1169                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1170                         continue;
1171                 }
1172
1173                 OBD_ALLOC(req, sizeof(*req));
1174                 if (req == NULL)
1175                         GOTO(out_set, rc = -ENOMEM);
1176                 req->rq_stripe = i;
1177                 req->rq_idx = loi->loi_ost_idx;
1178
1179                 req->rq_oi.oi_oa = obdo_alloc();
1180                 if (req->rq_oi.oi_oa == NULL) {
1181                         OBD_FREE(req, sizeof(*req));
1182                         GOTO(out_set, rc = -ENOMEM);
1183                 }
1184                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1185                        sizeof(*req->rq_oi.oi_oa));
1186                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1187                 req->rq_oi.oi_oa->o_stripe_idx = i;
1188                 req->rq_oi.oi_cb_up = cb_setattr_update;
1189                 req->rq_rqset = set;
1190
1191                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1192                         int off = lov_stripe_offset(oinfo->oi_md,
1193                                                     oinfo->oi_oa->o_size, i,
1194                                                     &req->rq_oi.oi_oa->o_size);
1195
1196                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1197                                 req->rq_oi.oi_oa->o_size--;
1198
1199                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1200                                i, req->rq_oi.oi_oa->o_size,
1201                                oinfo->oi_oa->o_size);
1202                 }
1203                 lov_set_add_req(req, set);
1204         }
1205         if (!set->set_count)
1206                 GOTO(out_set, rc = -EIO);
1207         *reqset = set;
1208         RETURN(rc);
1209 out_set:
1210         lov_fini_setattr_set(set);
1211         RETURN(rc);
1212 }
1213
1214 int lov_fini_punch_set(struct lov_request_set *set)
1215 {
1216         int rc = 0;
1217         ENTRY;
1218
1219         if (set == NULL)
1220                 RETURN(0);
1221         LASSERT(set->set_exp);
1222         if (set->set_completes) {
1223                 if (!set->set_success)
1224                         rc = -EIO;
1225                 /* FIXME update qos data here */
1226         }
1227
1228         if (atomic_dec_and_test(&set->set_refcount))
1229                 lov_finish_set(set);
1230
1231         RETURN(rc);
1232 }
1233
1234 /* The callback for osc_punch that finilizes a request info when a response
1235  * is recieved. */
1236 static int cb_update_punch(struct obd_info *oinfo, int rc)
1237 {
1238         struct lov_request *lovreq;
1239         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1240         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1241 }
1242
1243 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1244                        struct obd_trans_info *oti,
1245                        struct lov_request_set **reqset)
1246 {
1247         struct lov_request_set *set;
1248         struct lov_oinfo *loi = NULL;
1249         struct lov_obd *lov = &exp->exp_obd->u.lov;
1250         int rc = 0, i;
1251         ENTRY;
1252
1253         OBD_ALLOC(set, sizeof(*set));
1254         if (set == NULL)
1255                 RETURN(-ENOMEM);
1256         lov_init_set(set);
1257
1258         set->set_oi = oinfo;
1259         set->set_exp = exp;
1260
1261         loi = oinfo->oi_md->lsm_oinfo;
1262         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1263                 struct lov_request *req;
1264                 obd_off rs, re;
1265
1266                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1267                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1268                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1269                         continue;
1270                 }
1271
1272                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1273                                            oinfo->oi_policy.l_extent.start,
1274                                            oinfo->oi_policy.l_extent.end,
1275                                            &rs, &re))
1276                         continue;
1277
1278                 OBD_ALLOC(req, sizeof(*req));
1279                 if (req == NULL)
1280                         GOTO(out_set, rc = -ENOMEM);
1281                 req->rq_stripe = i;
1282                 req->rq_idx = loi->loi_ost_idx;
1283
1284                 req->rq_oi.oi_oa = obdo_alloc();
1285                 if (req->rq_oi.oi_oa == NULL) {
1286                         OBD_FREE(req, sizeof(*req));
1287                         GOTO(out_set, rc = -ENOMEM);
1288                 }
1289                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1290                        sizeof(*req->rq_oi.oi_oa));
1291                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1292                 req->rq_oi.oi_oa->o_stripe_idx = i;
1293                 req->rq_oi.oi_cb_up = cb_update_punch;
1294                 req->rq_rqset = set;
1295
1296                 req->rq_oi.oi_policy.l_extent.start = rs;
1297                 req->rq_oi.oi_policy.l_extent.end = re;
1298                 req->rq_oi.oi_policy.l_extent.gid = -1;
1299
1300                 lov_set_add_req(req, set);
1301         }
1302         if (!set->set_count)
1303                 GOTO(out_set, rc = -EIO);
1304         *reqset = set;
1305         RETURN(rc);
1306 out_set:
1307         lov_fini_punch_set(set);
1308         RETURN(rc);
1309 }
1310
1311 int lov_fini_sync_set(struct lov_request_set *set)
1312 {
1313         int rc = 0;
1314         ENTRY;
1315
1316         if (set == NULL)
1317                 RETURN(0);
1318         LASSERT(set->set_exp);
1319         if (set->set_completes) {
1320                 if (!set->set_success)
1321                         rc = -EIO;
1322                 /* FIXME update qos data here */
1323         }
1324
1325         if (atomic_dec_and_test(&set->set_refcount))
1326                 lov_finish_set(set);
1327
1328         RETURN(rc);
1329 }
1330
1331 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1332                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1333                       obd_off start, obd_off end,
1334                       struct lov_request_set **reqset)
1335 {
1336         struct lov_request_set *set;
1337         struct lov_oinfo *loi = NULL;
1338         struct lov_obd *lov = &exp->exp_obd->u.lov;
1339         int rc = 0, i;
1340         ENTRY;
1341
1342         OBD_ALLOC(set, sizeof(*set));
1343         if (set == NULL)
1344                 RETURN(-ENOMEM);
1345         lov_init_set(set);
1346
1347         set->set_exp = exp;
1348         set->set_oi = oinfo;
1349         set->set_oi->oi_md = lsm;
1350         set->set_oi->oi_oa = src_oa;
1351
1352         loi = lsm->lsm_oinfo;
1353         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1354                 struct lov_request *req;
1355                 obd_off rs, re;
1356
1357                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1358                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1359                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1360                         continue;
1361                 }
1362
1363                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1364                         continue;
1365
1366                 OBD_ALLOC(req, sizeof(*req));
1367                 if (req == NULL)
1368                         GOTO(out_set, rc = -ENOMEM);
1369                 req->rq_stripe = i;
1370                 req->rq_idx = loi->loi_ost_idx;
1371
1372                 req->rq_oi.oi_oa = obdo_alloc();
1373                 if (req->rq_oi.oi_oa == NULL) {
1374                         OBD_FREE(req, sizeof(*req));
1375                         GOTO(out_set, rc = -ENOMEM);
1376                 }
1377                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1378                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1379                 req->rq_oi.oi_oa->o_stripe_idx = i;
1380
1381                 req->rq_oi.oi_policy.l_extent.start = rs;
1382                 req->rq_oi.oi_policy.l_extent.end = re;
1383                 req->rq_oi.oi_policy.l_extent.gid = -1;
1384
1385                 lov_set_add_req(req, set);
1386         }
1387         if (!set->set_count)
1388                 GOTO(out_set, rc = -EIO);
1389         *reqset = set;
1390         RETURN(rc);
1391 out_set:
1392         lov_fini_sync_set(set);
1393         RETURN(rc);
1394 }
1395
1396 #define LOV_U64_MAX ((__u64)~0ULL)
1397 #define LOV_SUM_MAX(tot, add)                                           \
1398         do {                                                            \
1399                 if ((tot) + (add) < (tot))                              \
1400                         (tot) = LOV_U64_MAX;                            \
1401                 else                                                    \
1402                         (tot) += (add);                                 \
1403         } while(0)
1404
1405 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1406 {
1407         ENTRY;
1408
1409         if (success) {
1410                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1411
1412                 if (osfs->os_files != LOV_U64_MAX)
1413                         do_div(osfs->os_files, expected_stripes);
1414                 if (osfs->os_ffree != LOV_U64_MAX)
1415                         do_div(osfs->os_ffree, expected_stripes);
1416
1417                 spin_lock(&obd->obd_osfs_lock);
1418                 memcpy(&obd->obd_osfs, osfs, sizeof(osfs));
1419                 obd->obd_osfs_age = cfs_time_current_64();
1420                 spin_unlock(&obd->obd_osfs_lock);
1421                 RETURN(0);
1422         }
1423
1424         RETURN(-EIO);
1425 }
1426
1427 int lov_fini_statfs_set(struct lov_request_set *set)
1428 {
1429         int rc = 0;
1430         ENTRY;
1431
1432         if (set == NULL)
1433                 RETURN(0);
1434
1435         if (set->set_completes) {
1436                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1437                                      set->set_success);
1438         }
1439
1440         if (atomic_dec_and_test(&set->set_refcount))
1441                 lov_finish_set(set);
1442
1443         RETURN(rc);
1444 }
1445
1446 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1447                        struct obd_statfs *lov_sfs, int success)
1448 {
1449         spin_lock(&obd->obd_osfs_lock);
1450         memcpy(&obd->obd_osfs, lov_sfs, sizeof(osfs));
1451         obd->obd_osfs_age = cfs_time_current_64();
1452         spin_unlock(&obd->obd_osfs_lock);
1453
1454         if (success == 0) {
1455                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1456         } else {
1457 #ifdef MIN_DF
1458                 /* Sandia requested that df (and so, statfs) only
1459                    returned minimal available space on
1460                    a single OST, so people would be able to
1461                    write this much data guaranteed. */
1462                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1463                         /* Presumably if new bavail is smaller,
1464                            new bfree is bigger as well */
1465                         osfs->os_bfree = lov_sfs->os_bfree;
1466                         osfs->os_bavail = lov_sfs->os_bavail;
1467                 }
1468 #else
1469                 osfs->os_bfree += lov_sfs->os_bfree;
1470                 osfs->os_bavail += lov_sfs->os_bavail;
1471 #endif
1472                 osfs->os_blocks += lov_sfs->os_blocks;
1473                 /* XXX not sure about this one - depends on policy.
1474                  *   - could be minimum if we always stripe on all OBDs
1475                  *     (but that would be wrong for any other policy,
1476                  *     if one of the OBDs has no more objects left)
1477                  *   - could be sum if we stripe whole objects
1478                  *   - could be average, just to give a nice number
1479                  *
1480                  * To give a "reasonable" (if not wholly accurate)
1481                  * number, we divide the total number of free objects
1482                  * by expected stripe count (watch out for overflow).
1483                  */
1484                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1485                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1486         }
1487 }
1488
1489 /* The callback for osc_statfs_async that finilizes a request info when a
1490  * response is recieved. */
1491 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1492 {
1493         struct lov_request *lovreq;
1494         struct obd_statfs *osfs, *lov_sfs;
1495         struct obd_device *obd;
1496         struct lov_obd *lov;
1497         int success;
1498         ENTRY;
1499
1500         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1501         lov = &lovreq->rq_rqset->set_obd->u.lov;
1502         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1503
1504         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1505         lov_sfs = oinfo->oi_osfs;
1506
1507         success = lovreq->rq_rqset->set_success;
1508
1509         /* XXX: the same is done in lov_update_common_set, however
1510            lovset->set_exp is not initialized. */
1511         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1512         if (rc) {
1513                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1514                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1515                         rc = 0;
1516                 RETURN(rc);
1517         }
1518
1519         lov_update_statfs(obd, osfs, lov_sfs, success);
1520         RETURN(0);
1521 }
1522
1523 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1524                         struct lov_request_set **reqset)
1525 {
1526         struct lov_request_set *set;
1527         struct lov_obd *lov = &obd->u.lov;
1528         int rc = 0, i;
1529         ENTRY;
1530
1531         OBD_ALLOC(set, sizeof(*set));
1532         if (set == NULL)
1533                 RETURN(-ENOMEM);
1534         lov_init_set(set);
1535
1536         set->set_obd = obd;
1537         set->set_oi = oinfo;
1538
1539         /* We only get block data from the OBD */
1540         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1541                 struct lov_request *req;
1542
1543                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1544                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1545                         continue;
1546                 }
1547
1548                 OBD_ALLOC(req, sizeof(*req));
1549                 if (req == NULL)
1550                         GOTO(out_set, rc = -ENOMEM);
1551
1552                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1553                 if (req->rq_oi.oi_osfs == NULL) {
1554                         OBD_FREE(req, sizeof(*req));
1555                         GOTO(out_set, rc = -ENOMEM);
1556                 }
1557
1558                 req->rq_idx = i;
1559                 req->rq_oi.oi_cb_up = cb_statfs_update;
1560                 req->rq_rqset = set;
1561
1562                 lov_set_add_req(req, set);
1563         }
1564         if (!set->set_count)
1565                 GOTO(out_set, rc = -EIO);
1566         *reqset = set;
1567         RETURN(rc);
1568 out_set:
1569         lov_fini_statfs_set(set);
1570         RETURN(rc);
1571 }