Whamcloud - gitweb
8262a86833e63ef05233fe72fd6aa29f1520ffcf
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         obdo_free(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = &set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] &&
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         if (rc != -EINTR)
180                                 CERROR("enqueue objid "LPX64" subobj "
181                                        LPX64" on OST idx %d: rc %d\n",
182                                        set->set_oi->oi_md->lsm_object_id,
183                                        loi->loi_id, loi->loi_ost_idx, rc);
184                 } else {
185                         rc = ELDLM_OK;
186                 }
187         }
188         lov_update_set(set, req, rc);
189         RETURN(rc);
190 }
191
192 /* The callback for osc_enqueue that updates lov info for every OSC request. */
193 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
194 {
195         struct obd_enqueue_info *einfo;
196         struct lov_request *lovreq;
197
198         lovreq = container_of(oinfo, struct lov_request, rq_oi);
199         einfo = lovreq->rq_rqset->set_ei;
200         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
201 }
202
203 static int enqueue_done(struct lov_request_set *set, __u32 mode)
204 {
205         struct lov_request *req;
206         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
207         int rc = 0;
208         ENTRY;
209
210         /* enqueue/match success, just return */
211         if (set->set_completes && set->set_completes == set->set_success)
212                 RETURN(0);
213
214         /* cancel enqueued/matched locks */
215         list_for_each_entry(req, &set->set_list, rq_link) {
216                 struct lustre_handle *lov_lockhp;
217
218                 if (!req->rq_complete || req->rq_rc)
219                         continue;
220
221                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
222                 LASSERT(lov_lockhp);
223                 if (!lustre_handle_is_used(lov_lockhp))
224                         continue;
225
226                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
227                                 req->rq_oi.oi_md, mode, lov_lockhp);
228                 if (rc && lov->lov_tgts[req->rq_idx] &&
229                     lov->lov_tgts[req->rq_idx]->ltd_active)
230                         CERROR("cancelling obdjid "LPX64" on OST "
231                                "idx %d error: rc = %d\n",
232                                req->rq_oi.oi_md->lsm_object_id,
233                                req->rq_idx, rc);
234         }
235         if (set->set_lockh)
236                 lov_llh_put(set->set_lockh);
237         RETURN(rc);
238 }
239
240 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc)
241 {
242         int ret = 0;
243         ENTRY;
244
245         if (set == NULL)
246                 RETURN(0);
247         LASSERT(set->set_exp);
248         /* Do enqueue_done only for sync requests and if any request
249          * succeeded. */
250         if (!set->set_ei->ei_rqset) {
251                 if (rc)
252                         set->set_completes = 0;
253                 ret = enqueue_done(set, mode);
254         } else if (set->set_lockh)
255                 lov_llh_put(set->set_lockh);
256
257         if (atomic_dec_and_test(&set->set_refcount))
258                 lov_finish_set(set);
259
260         RETURN(rc ? rc : ret);
261 }
262
263 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
264                          struct obd_enqueue_info *einfo,
265                          struct lov_request_set **reqset)
266 {
267         struct lov_obd *lov = &exp->exp_obd->u.lov;
268         struct lov_request_set *set;
269         int i, rc = 0;
270         struct lov_oinfo *loi;
271         ENTRY;
272
273         OBD_ALLOC(set, sizeof(*set));
274         if (set == NULL)
275                 RETURN(-ENOMEM);
276         lov_init_set(set);
277
278         set->set_exp = exp;
279         set->set_oi = oinfo;
280         set->set_ei = einfo;
281         set->set_lockh = lov_llh_new(oinfo->oi_md);
282         if (set->set_lockh == NULL)
283                 GOTO(out_set, rc = -ENOMEM);
284         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
285
286         loi = oinfo->oi_md->lsm_oinfo;
287         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
288                 struct lov_request *req;
289                 obd_off start, end;
290
291                 if (!lov_stripe_intersects(oinfo->oi_md, i,
292                                            oinfo->oi_policy.l_extent.start,
293                                            oinfo->oi_policy.l_extent.end,
294                                            &start, &end))
295                         continue;
296
297                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
298                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
299                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
300                         continue;
301                 }
302
303                 OBD_ALLOC(req, sizeof(*req));
304                 if (req == NULL)
305                         GOTO(out_set, rc = -ENOMEM);
306
307                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
308                         sizeof(struct lov_oinfo);
309                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
310                 if (req->rq_oi.oi_md == NULL) {
311                         OBD_FREE(req, sizeof(*req));
312                         GOTO(out_set, rc = -ENOMEM);
313                 }
314
315                 req->rq_rqset = set;
316                 /* Set lov request specific parameters. */
317                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
318                 req->rq_oi.oi_cb_up = cb_update_enqueue;
319
320                 LASSERT(req->rq_oi.oi_lockh);
321
322                 req->rq_oi.oi_policy.l_extent.gid =
323                         oinfo->oi_policy.l_extent.gid;
324                 req->rq_oi.oi_policy.l_extent.start = start;
325                 req->rq_oi.oi_policy.l_extent.end = end;
326
327                 req->rq_idx = loi->loi_ost_idx;
328                 req->rq_stripe = i;
329
330                 /* XXX LOV STACKING: submd should be from the subobj */
331                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
332                 req->rq_oi.oi_md->lsm_stripe_count = 0;
333                 req->rq_oi.oi_md->lsm_oinfo->loi_kms_valid =
334                         loi->loi_kms_valid;
335                 req->rq_oi.oi_md->lsm_oinfo->loi_kms = loi->loi_kms;
336                 req->rq_oi.oi_md->lsm_oinfo->loi_lvb = loi->loi_lvb;
337
338                 lov_set_add_req(req, set);
339         }
340         if (!set->set_count)
341                 GOTO(out_set, rc = -EIO);
342         *reqset = set;
343         RETURN(0);
344 out_set:
345         lov_fini_enqueue_set(set, einfo->ei_mode, rc);
346         RETURN(rc);
347 }
348
349 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
350                          int rc)
351 {
352         int ret = rc;
353         ENTRY;
354
355         if (rc == 1)
356                 ret = 0;
357         else if (rc == 0)
358                 ret = 1;
359         lov_update_set(set, req, ret);
360         RETURN(rc);
361 }
362
363 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
364 {
365         int rc = 0;
366         ENTRY;
367
368         if (set == NULL)
369                 RETURN(0);
370         LASSERT(set->set_exp);
371         rc = enqueue_done(set, mode);
372         if ((set->set_count == set->set_success) &&
373             (flags & LDLM_FL_TEST_LOCK))
374                 lov_llh_put(set->set_lockh);
375
376         if (atomic_dec_and_test(&set->set_refcount))
377                 lov_finish_set(set);
378
379         RETURN(rc);
380 }
381
382 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
383                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
384                        __u32 mode, struct lustre_handle *lockh,
385                        struct lov_request_set **reqset)
386 {
387         struct lov_obd *lov = &exp->exp_obd->u.lov;
388         struct lov_request_set *set;
389         int i, rc = 0;
390         struct lov_oinfo *loi;
391         ENTRY;
392
393         OBD_ALLOC(set, sizeof(*set));
394         if (set == NULL)
395                 RETURN(-ENOMEM);
396         lov_init_set(set);
397
398         set->set_exp = exp;
399         set->set_oi = oinfo;
400         set->set_oi->oi_md = lsm;
401         set->set_lockh = lov_llh_new(lsm);
402         if (set->set_lockh == NULL)
403                 GOTO(out_set, rc = -ENOMEM);
404         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
405
406         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
407                 struct lov_request *req;
408                 obd_off start, end;
409
410                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
411                                            policy->l_extent.end, &start, &end))
412                         continue;
413
414                 /* FIXME raid1 should grace this error */
415                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
416                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
417                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
418                         GOTO(out_set, rc = -EIO);
419                 }
420
421                 OBD_ALLOC(req, sizeof(*req));
422                 if (req == NULL)
423                         GOTO(out_set, rc = -ENOMEM);
424
425                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
426                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
427                 if (req->rq_oi.oi_md == NULL) {
428                         OBD_FREE(req, sizeof(*req));
429                         GOTO(out_set, rc = -ENOMEM);
430                 }
431
432                 req->rq_oi.oi_policy.l_extent.start = start;
433                 req->rq_oi.oi_policy.l_extent.end = end;
434                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
435
436                 req->rq_idx = loi->loi_ost_idx;
437                 req->rq_stripe = i;
438
439                 /* XXX LOV STACKING: submd should be from the subobj */
440                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
441                 req->rq_oi.oi_md->lsm_stripe_count = 0;
442
443                 lov_set_add_req(req, set);
444         }
445         if (!set->set_count)
446                 GOTO(out_set, rc = -EIO);
447         *reqset = set;
448         RETURN(rc);
449 out_set:
450         lov_fini_match_set(set, mode, 0);
451         RETURN(rc);
452 }
453
454 int lov_fini_cancel_set(struct lov_request_set *set)
455 {
456         int rc = 0;
457         ENTRY;
458
459         if (set == NULL)
460                 RETURN(0);
461
462         LASSERT(set->set_exp);
463         if (set->set_lockh)
464                 lov_llh_put(set->set_lockh);
465
466         if (atomic_dec_and_test(&set->set_refcount))
467                 lov_finish_set(set);
468
469         RETURN(rc);
470 }
471
472 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
473                         struct lov_stripe_md *lsm, __u32 mode,
474                         struct lustre_handle *lockh,
475                         struct lov_request_set **reqset)
476 {
477         struct lov_request_set *set;
478         int i, rc = 0;
479         struct lov_oinfo *loi;
480         ENTRY;
481
482         OBD_ALLOC(set, sizeof(*set));
483         if (set == NULL)
484                 RETURN(-ENOMEM);
485         lov_init_set(set);
486
487         set->set_exp = exp;
488         set->set_oi = oinfo;
489         set->set_oi->oi_md = lsm;
490         set->set_lockh = lov_handle2llh(lockh);
491         if (set->set_lockh == NULL) {
492                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
493                 GOTO(out_set, rc = -EINVAL);
494         }
495         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
496
497         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
498                 struct lov_request *req;
499                 struct lustre_handle *lov_lockhp;
500
501                 lov_lockhp = set->set_lockh->llh_handles + i;
502                 if (!lustre_handle_is_used(lov_lockhp)) {
503                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
504                                loi->loi_ost_idx, loi->loi_id);
505                         continue;
506                 }
507
508                 OBD_ALLOC(req, sizeof(*req));
509                 if (req == NULL)
510                         GOTO(out_set, rc = -ENOMEM);
511
512                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
513                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
514                 if (req->rq_oi.oi_md == NULL) {
515                         OBD_FREE(req, sizeof(*req));
516                         GOTO(out_set, rc = -ENOMEM);
517                 }
518
519                 req->rq_idx = loi->loi_ost_idx;
520                 req->rq_stripe = i;
521
522                 /* XXX LOV STACKING: submd should be from the subobj */
523                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
524                 req->rq_oi.oi_md->lsm_stripe_count = 0;
525
526                 lov_set_add_req(req, set);
527         }
528         if (!set->set_count)
529                 GOTO(out_set, rc = -EIO);
530         *reqset = set;
531         RETURN(rc);
532 out_set:
533         lov_fini_cancel_set(set);
534         RETURN(rc);
535 }
536
537 static int create_done(struct obd_export *exp, struct lov_request_set *set,
538                        struct lov_stripe_md **lsmp)
539 {
540         struct lov_obd *lov = &exp->exp_obd->u.lov;
541         struct obd_trans_info *oti = set->set_oti;
542         struct obdo *src_oa = set->set_oi->oi_oa;
543         struct lov_request *req;
544         struct obdo *ret_oa = NULL;
545         int attrset = 0, rc = 0;
546         ENTRY;
547
548         LASSERT(set->set_completes);
549
550         /* try alloc objects on other osts if osc_create fails for
551          * exceptions: RPC failure, ENOSPC, etc */
552         if (set->set_count != set->set_success) {
553                 list_for_each_entry (req, &set->set_list, rq_link) {
554                         if (req->rq_rc == 0)
555                                 continue;
556
557                         set->set_completes--;
558                         req->rq_complete = 0;
559
560                         rc = qos_remedy_create(set, req);
561                         lov_update_create_set(set, req, rc);
562
563                         if (rc)
564                                 break;
565                 }
566         }
567
568         /* no successful creates */
569         if (set->set_success == 0)
570                 GOTO(cleanup, rc);
571
572         /* If there was an explicit stripe set, fail.  Otherwise, we
573          * got some objects and that's not bad. */
574         if (set->set_count != set->set_success) {
575                 if (*lsmp)
576                         GOTO(cleanup, rc);
577                 set->set_count = set->set_success;
578                 qos_shrink_lsm(set);
579         }
580
581         ret_oa = obdo_alloc();
582         if (ret_oa == NULL)
583                 GOTO(cleanup, rc = -ENOMEM);
584
585         list_for_each_entry(req, &set->set_list, rq_link) {
586                 if (!req->rq_complete || req->rq_rc)
587                         continue;
588                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
589                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
590                                 req->rq_stripe, &attrset);
591         }
592         if (src_oa->o_valid & OBD_MD_FLSIZE &&
593             ret_oa->o_size != src_oa->o_size) {
594                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
595                        src_oa->o_size, ret_oa->o_size);
596                 LBUG();
597         }
598         ret_oa->o_id = src_oa->o_id;
599         memcpy(src_oa, ret_oa, sizeof(*src_oa));
600         obdo_free(ret_oa);
601
602         *lsmp = set->set_oi->oi_md;
603         GOTO(done, rc = 0);
604
605 cleanup:
606         list_for_each_entry(req, &set->set_list, rq_link) {
607                 struct obd_export *sub_exp;
608                 int err = 0;
609
610                 if (!req->rq_complete || req->rq_rc)
611                         continue;
612
613                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
614                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
615                 if (err)
616                         CERROR("Failed to uncreate objid "LPX64" subobj "
617                                LPX64" on OST idx %d: rc = %d\n",
618                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
619                                req->rq_idx, rc);
620         }
621         if (*lsmp == NULL)
622                 obd_free_memmd(exp, &set->set_oi->oi_md);
623 done:
624         if (oti && set->set_cookies) {
625                 oti->oti_logcookies = set->set_cookies;
626                 if (!set->set_cookie_sent) {
627                         oti_free_cookies(oti);
628                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
629                 } else {
630                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
631                 }
632         }
633         RETURN(rc);
634 }
635
636 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
637 {
638         int rc = 0;
639         ENTRY;
640
641         if (set == NULL)
642                 RETURN(0);
643         LASSERT(set->set_exp);
644         if (set->set_completes)
645                 rc = create_done(set->set_exp, set, lsmp);
646
647         if (atomic_dec_and_test(&set->set_refcount))
648                 lov_finish_set(set);
649
650         RETURN(rc);
651 }
652
653 int lov_update_create_set(struct lov_request_set *set,
654                           struct lov_request *req, int rc)
655 {
656         struct obd_trans_info *oti = set->set_oti;
657         struct lov_stripe_md *lsm = set->set_oi->oi_md;
658         struct lov_oinfo *loi;
659         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
660         ENTRY;
661
662         req->rq_stripe = set->set_success;
663         loi = &lsm->lsm_oinfo[req->rq_stripe];
664
665         if (rc && lov->lov_tgts[req->rq_idx] &&
666             lov->lov_tgts[req->rq_idx]->ltd_active) {
667                 CERROR("error creating fid "LPX64" sub-object"
668                        " on OST idx %d/%d: rc = %d\n",
669                        set->set_oi->oi_oa->o_id, req->rq_idx,
670                        lsm->lsm_stripe_count, rc);
671                 if (rc > 0) {
672                         CERROR("obd_create returned invalid err %d\n", rc);
673                         rc = -EIO;
674                 }
675         }
676         lov_update_set(set, req, rc);
677         if (rc)
678                 RETURN(rc);
679
680         if (oti && oti->oti_objid)
681                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
682
683         loi->loi_id = req->rq_oi.oi_oa->o_id;
684         loi->loi_ost_idx = req->rq_idx;
685         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
686                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
687         loi_init(loi);
688
689         if (oti && set->set_cookies)
690                 ++oti->oti_logcookies;
691         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
692                 set->set_cookie_sent++;
693
694         RETURN(0);
695 }
696
697 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
698                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
699                         struct obd_trans_info *oti,
700                         struct lov_request_set **reqset)
701 {
702         struct lov_request_set *set;
703         int rc = 0;
704         ENTRY;
705
706         OBD_ALLOC(set, sizeof(*set));
707         if (set == NULL)
708                 RETURN(-ENOMEM);
709         lov_init_set(set);
710
711         set->set_exp = exp;
712         set->set_oi = oinfo;
713         set->set_oi->oi_md = *lsmp;
714         set->set_oi->oi_oa = src_oa;
715         set->set_oti = oti;
716
717         rc = qos_prep_create(exp, set);
718         if (rc)
719                 lov_fini_create_set(set, lsmp);
720         else
721                 *reqset = set;
722         RETURN(rc);
723 }
724
725 static int common_attr_done(struct lov_request_set *set)
726 {
727         struct list_head *pos;
728         struct lov_request *req;
729         struct obdo *tmp_oa;
730         int rc = 0, attrset = 0;
731         ENTRY;
732
733         LASSERT(set->set_oi != NULL);
734
735         if (set->set_oi->oi_oa == NULL)
736                 RETURN(0);
737
738         if (!set->set_success)
739                 RETURN(-EIO);
740
741         tmp_oa = obdo_alloc();
742         if (tmp_oa == NULL)
743                 GOTO(out, rc = -ENOMEM);
744
745         list_for_each (pos, &set->set_list) {
746                 req = list_entry(pos, struct lov_request, rq_link);
747
748                 if (!req->rq_complete || req->rq_rc)
749                         continue;
750                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
751                         continue;
752                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
753                                 req->rq_oi.oi_oa->o_valid,
754                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
755         }
756         if (!attrset) {
757                 CERROR("No stripes had valid attrs\n");
758                 rc = -EIO;
759         }
760         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
761         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
762 out:
763         if (tmp_oa)
764                 obdo_free(tmp_oa);
765         RETURN(rc);
766
767 }
768
769 static int brw_done(struct lov_request_set *set)
770 {
771         struct lov_stripe_md *lsm = set->set_oi->oi_md;
772         struct lov_oinfo     *loi = NULL;
773         struct list_head *pos;
774         struct lov_request *req;
775         ENTRY;
776
777         list_for_each (pos, &set->set_list) {
778                 req = list_entry(pos, struct lov_request, rq_link);
779
780                 if (!req->rq_complete || req->rq_rc)
781                         continue;
782
783                 loi = &lsm->lsm_oinfo[req->rq_stripe];
784
785                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
786                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
787         }
788
789         RETURN(0);
790 }
791
792 int lov_fini_brw_set(struct lov_request_set *set)
793 {
794         int rc = 0;
795         ENTRY;
796
797         if (set == NULL)
798                 RETURN(0);
799         LASSERT(set->set_exp);
800         if (set->set_completes) {
801                 rc = brw_done(set);
802                 /* FIXME update qos data here */
803         }
804         if (atomic_dec_and_test(&set->set_refcount))
805                 lov_finish_set(set);
806
807         RETURN(rc);
808 }
809
810 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
811                      obd_count oa_bufs, struct brw_page *pga,
812                      struct obd_trans_info *oti,
813                      struct lov_request_set **reqset)
814 {
815         struct {
816                 obd_count       index;
817                 obd_count       count;
818                 obd_count       off;
819         } *info = NULL;
820         struct lov_request_set *set;
821         struct lov_oinfo *loi = NULL;
822         struct lov_obd *lov = &exp->exp_obd->u.lov;
823         int rc = 0, i, shift;
824         ENTRY;
825
826         OBD_ALLOC(set, sizeof(*set));
827         if (set == NULL)
828                 RETURN(-ENOMEM);
829         lov_init_set(set);
830
831         set->set_exp = exp;
832         set->set_oti = oti;
833         set->set_oi = oinfo;
834         set->set_oabufs = oa_bufs;
835         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
836         if (!set->set_pga)
837                 GOTO(out, rc = -ENOMEM);
838
839         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
840         if (!info)
841                 GOTO(out, rc = -ENOMEM);
842
843         /* calculate the page count for each stripe */
844         for (i = 0; i < oa_bufs; i++) {
845                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
846                 info[stripe].count++;
847         }
848
849         /* alloc and initialize lov request */
850         shift = 0;
851         for (i = 0, loi = oinfo->oi_md->lsm_oinfo;
852              i < oinfo->oi_md->lsm_stripe_count; i++, loi++){
853                 struct lov_request *req;
854
855                 if (info[i].count == 0)
856                         continue;
857
858                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
859                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
860                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
861                         GOTO(out, rc = -EIO);
862                 }
863
864                 OBD_ALLOC(req, sizeof(*req));
865                 if (req == NULL)
866                         GOTO(out, rc = -ENOMEM);
867
868                 req->rq_oi.oi_oa = obdo_alloc();
869                 if (req->rq_oi.oi_oa == NULL) {
870                         OBD_FREE(req, sizeof(*req));
871                         GOTO(out, rc = -ENOMEM);
872                 }
873
874                 if (oinfo->oi_oa) {
875                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
876                                sizeof(*req->rq_oi.oi_oa));
877                 }
878                 req->rq_oi.oi_oa->o_id = loi->loi_id;
879                 req->rq_oi.oi_oa->o_stripe_idx = i;
880
881                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
882                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
883                 if (req->rq_oi.oi_md == NULL) {
884                         obdo_free(req->rq_oi.oi_oa);
885                         OBD_FREE(req, sizeof(*req));
886                         GOTO(out, rc = -ENOMEM);
887                 }
888
889                 req->rq_idx = loi->loi_ost_idx;
890                 req->rq_stripe = i;
891
892                 /* XXX LOV STACKING */
893                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
894                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
895                 req->rq_oabufs = info[i].count;
896                 req->rq_pgaidx = shift;
897                 shift += req->rq_oabufs;
898
899                 /* remember the index for sort brw_page array */
900                 info[i].index = req->rq_pgaidx;
901
902                 lov_set_add_req(req, set);
903         }
904         if (!set->set_count)
905                 GOTO(out, rc = -EIO);
906
907         /* rotate & sort the brw_page array */
908         for (i = 0; i < oa_bufs; i++) {
909                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
910
911                 shift = info[stripe].index + info[stripe].off;
912                 LASSERT(shift < oa_bufs);
913                 set->set_pga[shift] = pga[i];
914                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
915                                   &set->set_pga[shift].off);
916                 info[stripe].off++;
917         }
918 out:
919         if (info)
920                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
921
922         if (rc == 0)
923                 *reqset = set;
924         else
925                 lov_fini_brw_set(set);
926
927         RETURN(rc);
928 }
929
930 int lov_fini_getattr_set(struct lov_request_set *set)
931 {
932         int rc = 0;
933         ENTRY;
934
935         if (set == NULL)
936                 RETURN(0);
937         LASSERT(set->set_exp);
938         if (set->set_completes)
939                 rc = common_attr_done(set);
940
941         if (atomic_dec_and_test(&set->set_refcount))
942                 lov_finish_set(set);
943
944         RETURN(rc);
945 }
946
947 /* The callback for osc_getattr_async that finilizes a request info when a
948  * response is recieved. */
949 static int cb_getattr_update(struct obd_info *oinfo, int rc)
950 {
951         struct lov_request *lovreq;
952         lovreq = container_of(oinfo, struct lov_request, rq_oi);
953         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
954 }
955
956 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
957                          struct lov_request_set **reqset)
958 {
959         struct lov_request_set *set;
960         struct lov_oinfo *loi = NULL;
961         struct lov_obd *lov = &exp->exp_obd->u.lov;
962         int rc = 0, i;
963         ENTRY;
964
965         OBD_ALLOC(set, sizeof(*set));
966         if (set == NULL)
967                 RETURN(-ENOMEM);
968         lov_init_set(set);
969
970         set->set_exp = exp;
971         set->set_oi = oinfo;
972
973         loi = oinfo->oi_md->lsm_oinfo;
974         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
975                 struct lov_request *req;
976
977                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
978                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
979                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
980                         continue;
981                 }
982
983                 OBD_ALLOC(req, sizeof(*req));
984                 if (req == NULL)
985                         GOTO(out_set, rc = -ENOMEM);
986
987                 req->rq_stripe = i;
988                 req->rq_idx = loi->loi_ost_idx;
989
990                 req->rq_oi.oi_oa = obdo_alloc();
991                 if (req->rq_oi.oi_oa == NULL) {
992                         OBD_FREE(req, sizeof(*req));
993                         GOTO(out_set, rc = -ENOMEM);
994                 }
995                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
996                        sizeof(*req->rq_oi.oi_oa));
997                 req->rq_oi.oi_oa->o_id = loi->loi_id;
998                 req->rq_oi.oi_cb_up = cb_getattr_update;
999
1000                 lov_set_add_req(req, set);
1001         }
1002         if (!set->set_count)
1003                 GOTO(out_set, rc = -EIO);
1004         *reqset = set;
1005         RETURN(rc);
1006 out_set:
1007         lov_fini_getattr_set(set);
1008         RETURN(rc);
1009 }
1010
1011 int lov_fini_destroy_set(struct lov_request_set *set)
1012 {
1013         ENTRY;
1014
1015         if (set == NULL)
1016                 RETURN(0);
1017         LASSERT(set->set_exp);
1018         if (set->set_completes) {
1019                 /* FIXME update qos data here */
1020         }
1021
1022         if (atomic_dec_and_test(&set->set_refcount))
1023                 lov_finish_set(set);
1024
1025         RETURN(0);
1026 }
1027
1028 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1029                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1030                          struct obd_trans_info *oti,
1031                          struct lov_request_set **reqset)
1032 {
1033         struct lov_request_set *set;
1034         struct lov_oinfo *loi = NULL;
1035         struct lov_obd *lov = &exp->exp_obd->u.lov;
1036         int rc = 0, i;
1037         ENTRY;
1038
1039         OBD_ALLOC(set, sizeof(*set));
1040         if (set == NULL)
1041                 RETURN(-ENOMEM);
1042         lov_init_set(set);
1043
1044         set->set_exp = exp;
1045         set->set_oi = oinfo;
1046         set->set_oi->oi_md = lsm;
1047         set->set_oi->oi_oa = src_oa;
1048         set->set_oti = oti;
1049         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1050                 set->set_cookies = oti->oti_logcookies;
1051
1052         loi = lsm->lsm_oinfo;
1053         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1054                 struct lov_request *req;
1055
1056                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1057                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1058                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1059                         continue;
1060                 }
1061
1062                 OBD_ALLOC(req, sizeof(*req));
1063                 if (req == NULL)
1064                         GOTO(out_set, rc = -ENOMEM);
1065
1066                 req->rq_stripe = i;
1067                 req->rq_idx = loi->loi_ost_idx;
1068
1069                 req->rq_oi.oi_oa = obdo_alloc();
1070                 if (req->rq_oi.oi_oa == NULL) {
1071                         OBD_FREE(req, sizeof(*req));
1072                         GOTO(out_set, rc = -ENOMEM);
1073                 }
1074                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1075                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1076                 lov_set_add_req(req, set);
1077         }
1078         if (!set->set_count)
1079                 GOTO(out_set, rc = -EIO);
1080         *reqset = set;
1081         RETURN(rc);
1082 out_set:
1083         lov_fini_destroy_set(set);
1084         RETURN(rc);
1085 }
1086
1087 int lov_fini_setattr_set(struct lov_request_set *set)
1088 {
1089         int rc = 0;
1090         ENTRY;
1091
1092         if (set == NULL)
1093                 RETURN(0);
1094         LASSERT(set->set_exp);
1095         if (set->set_completes) {
1096                 rc = common_attr_done(set);
1097                 /* FIXME update qos data here */
1098         }
1099
1100         if (atomic_dec_and_test(&set->set_refcount))
1101                 lov_finish_set(set);
1102         RETURN(rc);
1103 }
1104
1105 int lov_update_setattr_set(struct lov_request_set *set,
1106                            struct lov_request *req, int rc)
1107 {
1108         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1109         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1110         ENTRY;
1111
1112         lov_update_set(set, req, rc);
1113
1114         /* grace error on inactive ost */
1115         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1116                     lov->lov_tgts[req->rq_idx]->ltd_active))
1117                 rc = 0;
1118
1119         if (rc == 0) {
1120                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1121                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_ctime =
1122                                 req->rq_oi.oi_oa->o_ctime;
1123                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1124                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_mtime =
1125                                 req->rq_oi.oi_oa->o_mtime;
1126                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1127                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_atime =
1128                                 req->rq_oi.oi_oa->o_atime;
1129         }
1130
1131         RETURN(rc);
1132 }
1133
1134 /* The callback for osc_setattr_async that finilizes a request info when a
1135  * response is recieved. */
1136 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1137 {
1138         struct lov_request *lovreq;
1139         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1140         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1141 }
1142
1143 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1144                          struct obd_trans_info *oti,
1145                          struct lov_request_set **reqset)
1146 {
1147         struct lov_request_set *set;
1148         struct lov_oinfo *loi = NULL;
1149         struct lov_obd *lov = &exp->exp_obd->u.lov;
1150         int rc = 0, i;
1151         ENTRY;
1152
1153         OBD_ALLOC(set, sizeof(*set));
1154         if (set == NULL)
1155                 RETURN(-ENOMEM);
1156         lov_init_set(set);
1157
1158         set->set_exp = exp;
1159         set->set_oti = oti;
1160         set->set_oi = oinfo;
1161         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1162                 set->set_cookies = oti->oti_logcookies;
1163
1164         loi = oinfo->oi_md->lsm_oinfo;
1165         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1166                 struct lov_request *req;
1167
1168                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1169                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1170                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1171                         continue;
1172                 }
1173
1174                 OBD_ALLOC(req, sizeof(*req));
1175                 if (req == NULL)
1176                         GOTO(out_set, rc = -ENOMEM);
1177                 req->rq_stripe = i;
1178                 req->rq_idx = loi->loi_ost_idx;
1179
1180                 req->rq_oi.oi_oa = obdo_alloc();
1181                 if (req->rq_oi.oi_oa == NULL) {
1182                         OBD_FREE(req, sizeof(*req));
1183                         GOTO(out_set, rc = -ENOMEM);
1184                 }
1185                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1186                        sizeof(*req->rq_oi.oi_oa));
1187                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1188                 req->rq_oi.oi_oa->o_stripe_idx = i;
1189                 req->rq_oi.oi_cb_up = cb_setattr_update;
1190                 req->rq_rqset = set;
1191
1192                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1193                         int off = lov_stripe_offset(oinfo->oi_md,
1194                                                     oinfo->oi_oa->o_size, i,
1195                                                     &req->rq_oi.oi_oa->o_size);
1196
1197                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1198                                 req->rq_oi.oi_oa->o_size--;
1199
1200                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1201                                i, req->rq_oi.oi_oa->o_size,
1202                                oinfo->oi_oa->o_size);
1203                 }
1204                 lov_set_add_req(req, set);
1205         }
1206         if (!set->set_count)
1207                 GOTO(out_set, rc = -EIO);
1208         *reqset = set;
1209         RETURN(rc);
1210 out_set:
1211         lov_fini_setattr_set(set);
1212         RETURN(rc);
1213 }
1214
1215 int lov_fini_punch_set(struct lov_request_set *set)
1216 {
1217         int rc = 0;
1218         ENTRY;
1219
1220         if (set == NULL)
1221                 RETURN(0);
1222         LASSERT(set->set_exp);
1223         if (set->set_completes) {
1224                 rc = -EIO;
1225                 /* FIXME update qos data here */
1226                 if (set->set_success)
1227                         rc = common_attr_done(set);
1228         }
1229
1230         if (atomic_dec_and_test(&set->set_refcount))
1231                 lov_finish_set(set);
1232
1233         RETURN(rc);
1234 }
1235
1236 int lov_update_punch_set(struct lov_request_set *set,
1237                            struct lov_request *req, int rc)
1238 {
1239         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1240         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1241         ENTRY;
1242
1243         lov_update_set(set, req, rc);
1244
1245         /* grace error on inactive ost */
1246         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1247                 rc = 0;
1248
1249         if (rc == 0) {
1250                 lov_stripe_lock(lsm);
1251                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1252                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_blocks =
1253                                 req->rq_oi.oi_oa->o_blocks;
1254                 }
1255
1256                 /* Do we need to update lvb_size here? It needn't because
1257                  * it have been done in ll_truncate(). -jay */
1258                 lov_stripe_unlock(lsm);
1259         }
1260
1261         RETURN(rc);
1262 }
1263
1264 /* The callback for osc_punch that finilizes a request info when a response
1265  * is recieved. */
1266 static int cb_update_punch(struct obd_info *oinfo, int rc)
1267 {
1268         struct lov_request *lovreq;
1269         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1270         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1271 }
1272
1273 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1274                        struct obd_trans_info *oti,
1275                        struct lov_request_set **reqset)
1276 {
1277         struct lov_request_set *set;
1278         struct lov_oinfo *loi = NULL;
1279         struct lov_obd *lov = &exp->exp_obd->u.lov;
1280         int rc = 0, i;
1281         ENTRY;
1282
1283         OBD_ALLOC(set, sizeof(*set));
1284         if (set == NULL)
1285                 RETURN(-ENOMEM);
1286         lov_init_set(set);
1287
1288         set->set_oi = oinfo;
1289         set->set_exp = exp;
1290
1291         loi = oinfo->oi_md->lsm_oinfo;
1292         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1293                 struct lov_request *req;
1294                 obd_off rs, re;
1295
1296                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1297                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1298                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1299                         continue;
1300                 }
1301
1302                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1303                                            oinfo->oi_policy.l_extent.start,
1304                                            oinfo->oi_policy.l_extent.end,
1305                                            &rs, &re))
1306                         continue;
1307
1308                 OBD_ALLOC(req, sizeof(*req));
1309                 if (req == NULL)
1310                         GOTO(out_set, rc = -ENOMEM);
1311                 req->rq_stripe = i;
1312                 req->rq_idx = loi->loi_ost_idx;
1313
1314                 req->rq_oi.oi_oa = obdo_alloc();
1315                 if (req->rq_oi.oi_oa == NULL) {
1316                         OBD_FREE(req, sizeof(*req));
1317                         GOTO(out_set, rc = -ENOMEM);
1318                 }
1319                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1320                        sizeof(*req->rq_oi.oi_oa));
1321                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1322                 req->rq_oi.oi_oa->o_stripe_idx = i;
1323                 req->rq_oi.oi_cb_up = cb_update_punch;
1324                 req->rq_rqset = set;
1325
1326                 req->rq_oi.oi_policy.l_extent.start = rs;
1327                 req->rq_oi.oi_policy.l_extent.end = re;
1328                 req->rq_oi.oi_policy.l_extent.gid = -1;
1329
1330                 lov_set_add_req(req, set);
1331         }
1332         if (!set->set_count)
1333                 GOTO(out_set, rc = -EIO);
1334         *reqset = set;
1335         RETURN(rc);
1336 out_set:
1337         lov_fini_punch_set(set);
1338         RETURN(rc);
1339 }
1340
1341 int lov_fini_sync_set(struct lov_request_set *set)
1342 {
1343         int rc = 0;
1344         ENTRY;
1345
1346         if (set == NULL)
1347                 RETURN(0);
1348         LASSERT(set->set_exp);
1349         if (set->set_completes) {
1350                 if (!set->set_success)
1351                         rc = -EIO;
1352                 /* FIXME update qos data here */
1353         }
1354
1355         if (atomic_dec_and_test(&set->set_refcount))
1356                 lov_finish_set(set);
1357
1358         RETURN(rc);
1359 }
1360
1361 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1362                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1363                       obd_off start, obd_off end,
1364                       struct lov_request_set **reqset)
1365 {
1366         struct lov_request_set *set;
1367         struct lov_oinfo *loi = NULL;
1368         struct lov_obd *lov = &exp->exp_obd->u.lov;
1369         int rc = 0, i;
1370         ENTRY;
1371
1372         OBD_ALLOC(set, sizeof(*set));
1373         if (set == NULL)
1374                 RETURN(-ENOMEM);
1375         lov_init_set(set);
1376
1377         set->set_exp = exp;
1378         set->set_oi = oinfo;
1379         set->set_oi->oi_md = lsm;
1380         set->set_oi->oi_oa = src_oa;
1381
1382         loi = lsm->lsm_oinfo;
1383         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1384                 struct lov_request *req;
1385                 obd_off rs, re;
1386
1387                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1388                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1389                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1390                         continue;
1391                 }
1392
1393                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1394                         continue;
1395
1396                 OBD_ALLOC(req, sizeof(*req));
1397                 if (req == NULL)
1398                         GOTO(out_set, rc = -ENOMEM);
1399                 req->rq_stripe = i;
1400                 req->rq_idx = loi->loi_ost_idx;
1401
1402                 req->rq_oi.oi_oa = obdo_alloc();
1403                 if (req->rq_oi.oi_oa == NULL) {
1404                         OBD_FREE(req, sizeof(*req));
1405                         GOTO(out_set, rc = -ENOMEM);
1406                 }
1407                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1408                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1409                 req->rq_oi.oi_oa->o_stripe_idx = i;
1410
1411                 req->rq_oi.oi_policy.l_extent.start = rs;
1412                 req->rq_oi.oi_policy.l_extent.end = re;
1413                 req->rq_oi.oi_policy.l_extent.gid = -1;
1414
1415                 lov_set_add_req(req, set);
1416         }
1417         if (!set->set_count)
1418                 GOTO(out_set, rc = -EIO);
1419         *reqset = set;
1420         RETURN(rc);
1421 out_set:
1422         lov_fini_sync_set(set);
1423         RETURN(rc);
1424 }
1425
1426 #define LOV_U64_MAX ((__u64)~0ULL)
1427 #define LOV_SUM_MAX(tot, add)                                           \
1428         do {                                                            \
1429                 if ((tot) + (add) < (tot))                              \
1430                         (tot) = LOV_U64_MAX;                            \
1431                 else                                                    \
1432                         (tot) += (add);                                 \
1433         } while(0)
1434
1435 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1436 {
1437         ENTRY;
1438
1439         if (success) {
1440                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1441
1442                 if (osfs->os_files != LOV_U64_MAX)
1443                         do_div(osfs->os_files, expected_stripes);
1444                 if (osfs->os_ffree != LOV_U64_MAX)
1445                         do_div(osfs->os_ffree, expected_stripes);
1446
1447                 spin_lock(&obd->obd_osfs_lock);
1448                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1449                 obd->obd_osfs_age = get_jiffies_64();
1450                 spin_unlock(&obd->obd_osfs_lock);
1451                 RETURN(0);
1452         }
1453
1454         RETURN(-EIO);
1455 }
1456
1457 int lov_fini_statfs_set(struct lov_request_set *set)
1458 {
1459         int rc = 0;
1460         ENTRY;
1461
1462         if (set == NULL)
1463                 RETURN(0);
1464
1465         if (set->set_completes) {
1466                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1467                                      set->set_success);
1468         }
1469
1470         if (atomic_dec_and_test(&set->set_refcount))
1471                 lov_finish_set(set);
1472
1473         RETURN(rc);
1474 }
1475
1476 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1477                        struct obd_statfs *lov_sfs, int success)
1478 {
1479         spin_lock(&obd->obd_osfs_lock);
1480         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1481         obd->obd_osfs_age = get_jiffies_64();
1482         spin_unlock(&obd->obd_osfs_lock);
1483
1484         if (success == 0) {
1485                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1486         } else {
1487 #ifdef MIN_DF
1488                 /* Sandia requested that df (and so, statfs) only
1489                    returned minimal available space on
1490                    a single OST, so people would be able to
1491                    write this much data guaranteed. */
1492                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1493                         /* Presumably if new bavail is smaller,
1494                            new bfree is bigger as well */
1495                         osfs->os_bfree = lov_sfs->os_bfree;
1496                         osfs->os_bavail = lov_sfs->os_bavail;
1497                 }
1498 #else
1499                 osfs->os_bfree += lov_sfs->os_bfree;
1500                 osfs->os_bavail += lov_sfs->os_bavail;
1501 #endif
1502                 osfs->os_blocks += lov_sfs->os_blocks;
1503                 /* XXX not sure about this one - depends on policy.
1504                  *   - could be minimum if we always stripe on all OBDs
1505                  *     (but that would be wrong for any other policy,
1506                  *     if one of the OBDs has no more objects left)
1507                  *   - could be sum if we stripe whole objects
1508                  *   - could be average, just to give a nice number
1509                  *
1510                  * To give a "reasonable" (if not wholly accurate)
1511                  * number, we divide the total number of free objects
1512                  * by expected stripe count (watch out for overflow).
1513                  */
1514                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1515                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1516         }
1517 }
1518
1519 /* The callback for osc_statfs_async that finilizes a request info when a
1520  * response is recieved. */
1521 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1522 {
1523         struct lov_request *lovreq;
1524         struct obd_statfs *osfs, *lov_sfs;
1525         struct obd_device *obd;
1526         struct lov_obd *lov;
1527         int success;
1528         ENTRY;
1529
1530         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1531         lov = &lovreq->rq_rqset->set_obd->u.lov;
1532         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1533
1534         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1535         lov_sfs = oinfo->oi_osfs;
1536
1537         success = lovreq->rq_rqset->set_success;
1538
1539         /* XXX: the same is done in lov_update_common_set, however
1540            lovset->set_exp is not initialized. */
1541         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1542         if (rc) {
1543                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1544                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1545                         rc = 0;
1546                 RETURN(rc);
1547         }
1548
1549         lov_update_statfs(obd, osfs, lov_sfs, success);
1550         qos_update(lov);
1551
1552         RETURN(0);
1553 }
1554
1555 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1556                         struct lov_request_set **reqset)
1557 {
1558         struct lov_request_set *set;
1559         struct lov_obd *lov = &obd->u.lov;
1560         int rc = 0, i;
1561         ENTRY;
1562
1563         OBD_ALLOC(set, sizeof(*set));
1564         if (set == NULL)
1565                 RETURN(-ENOMEM);
1566         lov_init_set(set);
1567
1568         set->set_obd = obd;
1569         set->set_oi = oinfo;
1570
1571         /* We only get block data from the OBD */
1572         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1573                 struct lov_request *req;
1574
1575                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1576                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1577                         continue;
1578                 }
1579
1580                 OBD_ALLOC(req, sizeof(*req));
1581                 if (req == NULL)
1582                         GOTO(out_set, rc = -ENOMEM);
1583
1584                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1585                 if (req->rq_oi.oi_osfs == NULL) {
1586                         OBD_FREE(req, sizeof(*req));
1587                         GOTO(out_set, rc = -ENOMEM);
1588                 }
1589
1590                 req->rq_idx = i;
1591                 req->rq_oi.oi_cb_up = cb_statfs_update;
1592                 req->rq_rqset = set;
1593
1594                 lov_set_add_req(req, set);
1595         }
1596         if (!set->set_count)
1597                 GOTO(out_set, rc = -EIO);
1598         *reqset = set;
1599         RETURN(rc);
1600 out_set:
1601         lov_fini_statfs_set(set);
1602         RETURN(rc);
1603 }