Whamcloud - gitweb
* all capabilities of opened inode should always renew because many file
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         obdo_free(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = &set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] && 
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         CERROR("error: enqueue objid "LPX64" subobj "
180                                 LPX64" on OST idx %d: rc = %d\n",
181                                 set->set_oi->oi_md->lsm_object_id,
182                                 loi->loi_id, loi->loi_ost_idx, rc);
183                 } else {
184                         rc = ELDLM_OK;
185                 }
186         }
187         lov_update_set(set, req, rc);
188         RETURN(rc);
189 }
190
191 /* The callback for osc_enqueue that updates lov info for every OSC request. */
192 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
193 {
194         struct obd_enqueue_info *einfo;
195         struct lov_request *lovreq;
196
197         lovreq = container_of(oinfo, struct lov_request, rq_oi);
198         einfo = lovreq->rq_rqset->set_ei;
199         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
200 }
201
202 static int enqueue_done(struct lov_request_set *set, __u32 mode)
203 {
204         struct lov_request *req;
205         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
206         int rc = 0;
207         ENTRY;
208
209         /* enqueue/match success, just return */
210         if (set->set_completes && set->set_completes == set->set_success)
211                 RETURN(0);
212
213         /* cancel enqueued/matched locks */
214         list_for_each_entry(req, &set->set_list, rq_link) {
215                 struct lustre_handle *lov_lockhp;
216
217                 if (!req->rq_complete || req->rq_rc)
218                         continue;
219
220                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
221                 LASSERT(lov_lockhp);
222                 if (!lustre_handle_is_used(lov_lockhp))
223                         continue;
224
225                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
226                                 req->rq_oi.oi_md, mode, lov_lockhp);
227                 if (rc && lov->lov_tgts[req->rq_idx] &&
228                     lov->lov_tgts[req->rq_idx]->ltd_active)
229                         CERROR("cancelling obdjid "LPX64" on OST "
230                                "idx %d error: rc = %d\n",
231                                req->rq_oi.oi_md->lsm_object_id,
232                                req->rq_idx, rc);
233         }
234         if (set->set_lockh)
235                 lov_llh_put(set->set_lockh);
236         RETURN(rc);
237 }
238
239 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc)
240 {
241         int ret = 0;
242         ENTRY;
243
244         if (set == NULL)
245                 RETURN(0);
246         LASSERT(set->set_exp);
247         /* Do enqueue_done only for sync requests and if any request
248          * succeeded. */
249         if (!set->set_ei->ei_rqset) {
250                 if (rc)
251                         set->set_completes = 0;
252                 ret = enqueue_done(set, mode);
253         } else if (set->set_lockh)
254                 lov_llh_put(set->set_lockh);
255
256         if (atomic_dec_and_test(&set->set_refcount))
257                 lov_finish_set(set);
258
259         RETURN(rc ? rc : ret);
260 }
261
262 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
263                          struct obd_enqueue_info *einfo,
264                          struct lov_request_set **reqset)
265 {
266         struct lov_obd *lov = &exp->exp_obd->u.lov;
267         struct lov_request_set *set;
268         int i, rc = 0;
269         struct lov_oinfo *loi;
270         ENTRY;
271
272         OBD_ALLOC(set, sizeof(*set));
273         if (set == NULL)
274                 RETURN(-ENOMEM);
275         lov_init_set(set);
276
277         set->set_exp = exp;
278         set->set_oi = oinfo;
279         set->set_ei = einfo;
280         set->set_lockh = lov_llh_new(oinfo->oi_md);
281         if (set->set_lockh == NULL)
282                 GOTO(out_set, rc = -ENOMEM);
283         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
284
285         loi = oinfo->oi_md->lsm_oinfo;
286         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
287                 struct lov_request *req;
288                 obd_off start, end;
289
290                 if (!lov_stripe_intersects(oinfo->oi_md, i,
291                                            oinfo->oi_policy.l_extent.start,
292                                            oinfo->oi_policy.l_extent.end,
293                                            &start, &end))
294                         continue;
295
296                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
297                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
298                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
299                         continue;
300                 }
301
302                 OBD_ALLOC(req, sizeof(*req));
303                 if (req == NULL)
304                         GOTO(out_set, rc = -ENOMEM);
305
306                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
307                         sizeof(struct lov_oinfo);
308                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
309                 if (req->rq_oi.oi_md == NULL) {
310                         OBD_FREE(req, sizeof(*req));
311                         GOTO(out_set, rc = -ENOMEM);
312                 }
313
314                 req->rq_rqset = set;
315                 /* Set lov request specific parameters. */
316                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
317                 req->rq_oi.oi_cb_up = cb_update_enqueue;
318
319                 LASSERT(req->rq_oi.oi_lockh);
320
321                 req->rq_oi.oi_policy.l_extent.gid =
322                         oinfo->oi_policy.l_extent.gid;
323                 req->rq_oi.oi_policy.l_extent.start = start;
324                 req->rq_oi.oi_policy.l_extent.end = end;
325
326                 req->rq_idx = loi->loi_ost_idx;
327                 req->rq_stripe = i;
328
329                 /* XXX LOV STACKING: submd should be from the subobj */
330                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
331                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
332                 req->rq_oi.oi_md->lsm_stripe_count = 0;
333                 req->rq_oi.oi_md->lsm_oinfo->loi_kms_valid =
334                         loi->loi_kms_valid;
335                 req->rq_oi.oi_md->lsm_oinfo->loi_kms = loi->loi_kms;
336                 req->rq_oi.oi_md->lsm_oinfo->loi_lvb = loi->loi_lvb;
337
338                 lov_set_add_req(req, set);
339         }
340         if (!set->set_count)
341                 GOTO(out_set, rc = -EIO);
342         *reqset = set;
343         RETURN(0);
344 out_set:
345         lov_fini_enqueue_set(set, einfo->ei_mode, rc);
346         RETURN(rc);
347 }
348
349 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
350                          int rc)
351 {
352         int ret = rc;
353         ENTRY;
354
355         if (rc == 1)
356                 ret = 0;
357         else if (rc == 0)
358                 ret = 1;
359         lov_update_set(set, req, ret);
360         RETURN(rc);
361 }
362
363 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
364 {
365         int rc = 0;
366         ENTRY;
367
368         if (set == NULL)
369                 RETURN(0);
370         LASSERT(set->set_exp);
371         rc = enqueue_done(set, mode);
372         if ((set->set_count == set->set_success) &&
373             (flags & LDLM_FL_TEST_LOCK))
374                 lov_llh_put(set->set_lockh);
375
376         if (atomic_dec_and_test(&set->set_refcount))
377                 lov_finish_set(set);
378
379         RETURN(rc);
380 }
381
382 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
383                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
384                        __u32 mode, struct lustre_handle *lockh,
385                        struct lov_request_set **reqset)
386 {
387         struct lov_obd *lov = &exp->exp_obd->u.lov;
388         struct lov_request_set *set;
389         int i, rc = 0;
390         struct lov_oinfo *loi;
391         ENTRY;
392
393         OBD_ALLOC(set, sizeof(*set));
394         if (set == NULL)
395                 RETURN(-ENOMEM);
396         lov_init_set(set);
397
398         set->set_exp = exp;
399         set->set_oi = oinfo;
400         set->set_oi->oi_md = lsm;
401         set->set_lockh = lov_llh_new(lsm);
402         if (set->set_lockh == NULL)
403                 GOTO(out_set, rc = -ENOMEM);
404         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
405
406         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
407                 struct lov_request *req;
408                 obd_off start, end;
409
410                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
411                                            policy->l_extent.end, &start, &end))
412                         continue;
413
414                 /* FIXME raid1 should grace this error */
415                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
416                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
417                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
418                         GOTO(out_set, rc = -EIO);
419                 }
420
421                 OBD_ALLOC(req, sizeof(*req));
422                 if (req == NULL)
423                         GOTO(out_set, rc = -ENOMEM);
424
425                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
426                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
427                 if (req->rq_oi.oi_md == NULL) {
428                         OBD_FREE(req, sizeof(*req));
429                         GOTO(out_set, rc = -ENOMEM);
430                 }
431
432                 req->rq_oi.oi_policy.l_extent.start = start;
433                 req->rq_oi.oi_policy.l_extent.end = end;
434                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
435
436                 req->rq_idx = loi->loi_ost_idx;
437                 req->rq_stripe = i;
438
439                 /* XXX LOV STACKING: submd should be from the subobj */
440                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
441                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
442                 req->rq_oi.oi_md->lsm_stripe_count = 0;
443
444                 lov_set_add_req(req, set);
445         }
446         if (!set->set_count)
447                 GOTO(out_set, rc = -EIO);
448         *reqset = set;
449         RETURN(rc);
450 out_set:
451         lov_fini_match_set(set, mode, 0);
452         RETURN(rc);
453 }
454
455 int lov_fini_cancel_set(struct lov_request_set *set)
456 {
457         int rc = 0;
458         ENTRY;
459
460         if (set == NULL)
461                 RETURN(0);
462
463         LASSERT(set->set_exp);
464         if (set->set_lockh)
465                 lov_llh_put(set->set_lockh);
466
467         if (atomic_dec_and_test(&set->set_refcount))
468                 lov_finish_set(set);
469
470         RETURN(rc);
471 }
472
473 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
474                         struct lov_stripe_md *lsm, __u32 mode,
475                         struct lustre_handle *lockh,
476                         struct lov_request_set **reqset)
477 {
478         struct lov_request_set *set;
479         int i, rc = 0;
480         struct lov_oinfo *loi;
481         ENTRY;
482
483         OBD_ALLOC(set, sizeof(*set));
484         if (set == NULL)
485                 RETURN(-ENOMEM);
486         lov_init_set(set);
487
488         set->set_exp = exp;
489         set->set_oi = oinfo;
490         set->set_oi->oi_md = lsm;
491         set->set_lockh = lov_handle2llh(lockh);
492         if (set->set_lockh == NULL) {
493                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
494                 GOTO(out_set, rc = -EINVAL);
495         }
496         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
497
498         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
499                 struct lov_request *req;
500                 struct lustre_handle *lov_lockhp;
501
502                 lov_lockhp = set->set_lockh->llh_handles + i;
503                 if (!lustre_handle_is_used(lov_lockhp)) {
504                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
505                                loi->loi_ost_idx, loi->loi_id);
506                         continue;
507                 }
508
509                 OBD_ALLOC(req, sizeof(*req));
510                 if (req == NULL)
511                         GOTO(out_set, rc = -ENOMEM);
512
513                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
514                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
515                 if (req->rq_oi.oi_md == NULL) {
516                         OBD_FREE(req, sizeof(*req));
517                         GOTO(out_set, rc = -ENOMEM);
518                 }
519
520                 req->rq_idx = loi->loi_ost_idx;
521                 req->rq_stripe = i;
522
523                 /* XXX LOV STACKING: submd should be from the subobj */
524                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
525                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
526                 req->rq_oi.oi_md->lsm_stripe_count = 0;
527
528                 lov_set_add_req(req, set);
529         }
530         if (!set->set_count)
531                 GOTO(out_set, rc = -EIO);
532         *reqset = set;
533         RETURN(rc);
534 out_set:
535         lov_fini_cancel_set(set);
536         RETURN(rc);
537 }
538
539 static int create_done(struct obd_export *exp, struct lov_request_set *set,
540                        struct lov_stripe_md **lsmp)
541 {
542         struct lov_obd *lov = &exp->exp_obd->u.lov;
543         struct obd_trans_info *oti = set->set_oti;
544         struct obdo *src_oa = set->set_oi->oi_oa;
545         struct lov_request *req;
546         struct obdo *ret_oa = NULL;
547         int attrset = 0, rc = 0;
548         ENTRY;
549
550         LASSERT(set->set_completes);
551
552         /* try alloc objects on other osts if osc_create fails for
553          * exceptions: RPC failure, ENOSPC, etc */
554         if (set->set_count != set->set_success) {
555                 list_for_each_entry (req, &set->set_list, rq_link) {
556                         if (req->rq_rc == 0)
557                                 continue;
558
559                         set->set_completes--;
560                         req->rq_complete = 0;
561
562                         rc = qos_remedy_create(set, req);
563                         lov_update_create_set(set, req, rc);
564
565                         if (rc)
566                                 break;
567                 }
568         }
569
570         /* no successful creates */
571         if (set->set_success == 0)
572                 GOTO(cleanup, rc);
573
574         /* If there was an explicit stripe set, fail.  Otherwise, we
575          * got some objects and that's not bad. */
576         if (set->set_count != set->set_success) {
577                 if (*lsmp)
578                         GOTO(cleanup, rc);
579                 set->set_count = set->set_success;
580                 qos_shrink_lsm(set);
581         }
582
583         ret_oa = obdo_alloc();
584         if (ret_oa == NULL)
585                 GOTO(cleanup, rc = -ENOMEM);
586
587         list_for_each_entry(req, &set->set_list, rq_link) {
588                 if (!req->rq_complete || req->rq_rc)
589                         continue;
590                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
591                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
592                                 req->rq_stripe, &attrset);
593         }
594         if (src_oa->o_valid & OBD_MD_FLSIZE &&
595             ret_oa->o_size != src_oa->o_size) {
596                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
597                        src_oa->o_size, ret_oa->o_size);
598                 LBUG();
599         }
600         ret_oa->o_id = src_oa->o_id;
601         ret_oa->o_gr = src_oa->o_gr;
602         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
603         memcpy(src_oa, ret_oa, sizeof(*src_oa));
604         obdo_free(ret_oa);
605
606         *lsmp = set->set_oi->oi_md;
607         GOTO(done, rc = 0);
608
609 cleanup:
610         list_for_each_entry(req, &set->set_list, rq_link) {
611                 struct obd_export *sub_exp;
612                 int err = 0;
613
614                 if (!req->rq_complete || req->rq_rc)
615                         continue;
616
617                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
618                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
619                 if (err)
620                         CERROR("Failed to uncreate objid "LPX64" subobj "
621                                LPX64" on OST idx %d: rc = %d\n",
622                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
623                                req->rq_idx, rc);
624         }
625         if (*lsmp == NULL)
626                 obd_free_memmd(exp, &set->set_oi->oi_md);
627 done:
628         if (oti && set->set_cookies) {
629                 oti->oti_logcookies = set->set_cookies;
630                 if (!set->set_cookie_sent) {
631                         oti_free_cookies(oti);
632                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
633                 } else {
634                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
635                 }
636         }
637         RETURN(rc);
638 }
639
640 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
641 {
642         int rc = 0;
643         ENTRY;
644
645         if (set == NULL)
646                 RETURN(0);
647         LASSERT(set->set_exp);
648         if (set->set_completes)
649                 rc = create_done(set->set_exp, set, lsmp);
650
651         if (atomic_dec_and_test(&set->set_refcount))
652                 lov_finish_set(set);
653
654         RETURN(rc);
655 }
656
657 int lov_update_create_set(struct lov_request_set *set,
658                           struct lov_request *req, int rc)
659 {
660         struct obd_trans_info *oti = set->set_oti;
661         struct lov_stripe_md *lsm = set->set_oi->oi_md;
662         struct lov_oinfo *loi;
663         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
664         ENTRY;
665
666         req->rq_stripe = set->set_success;
667         loi = &lsm->lsm_oinfo[req->rq_stripe];
668
669         if (rc && lov->lov_tgts[req->rq_idx] &&
670             lov->lov_tgts[req->rq_idx]->ltd_active) {
671                 CERROR("error creating fid "LPX64" sub-object"
672                        " on OST idx %d/%d: rc = %d\n",
673                        set->set_oi->oi_oa->o_id, req->rq_idx,
674                        lsm->lsm_stripe_count, rc);
675                 if (rc > 0) {
676                         CERROR("obd_create returned invalid err %d\n", rc);
677                         rc = -EIO;
678                 }
679         }
680         lov_update_set(set, req, rc);
681         if (rc)
682                 RETURN(rc);
683
684         if (oti && oti->oti_objid)
685                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
686
687         loi->loi_id = req->rq_oi.oi_oa->o_id;
688         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
689         loi->loi_ost_idx = req->rq_idx;
690         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
691                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
692         loi_init(loi);
693
694         if (oti && set->set_cookies)
695                 ++oti->oti_logcookies;
696         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
697                 set->set_cookie_sent++;
698
699         RETURN(0);
700 }
701
702 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
703                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
704                         struct obd_trans_info *oti,
705                         struct lov_request_set **reqset)
706 {
707         struct lov_request_set *set;
708         int rc = 0;
709         ENTRY;
710
711         OBD_ALLOC(set, sizeof(*set));
712         if (set == NULL)
713                 RETURN(-ENOMEM);
714         lov_init_set(set);
715
716         set->set_exp = exp;
717         set->set_oi = oinfo;
718         set->set_oi->oi_md = *lsmp;
719         set->set_oi->oi_oa = src_oa;
720         set->set_oti = oti;
721
722         rc = qos_prep_create(exp, set);
723         if (rc)
724                 lov_fini_create_set(set, lsmp);
725         else
726                 *reqset = set;
727         RETURN(rc);
728 }
729
730 static int common_attr_done(struct lov_request_set *set)
731 {
732         struct list_head *pos;
733         struct lov_request *req;
734         struct obdo *tmp_oa;
735         int rc = 0, attrset = 0;
736         ENTRY;
737
738         LASSERT(set->set_oi != NULL);
739
740         if (set->set_oi->oi_oa == NULL)
741                 RETURN(0);
742
743         if (!set->set_success)
744                 RETURN(-EIO);
745
746         tmp_oa = obdo_alloc();
747         if (tmp_oa == NULL)
748                 GOTO(out, rc = -ENOMEM);
749
750         list_for_each (pos, &set->set_list) {
751                 req = list_entry(pos, struct lov_request, rq_link);
752
753                 if (!req->rq_complete || req->rq_rc)
754                         continue;
755                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
756                         continue;
757                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
758                                 req->rq_oi.oi_oa->o_valid,
759                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
760         }
761         if (!attrset) {
762                 CERROR("No stripes had valid attrs\n");
763                 rc = -EIO;
764         }
765         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
766         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
767 out:
768         if (tmp_oa)
769                 obdo_free(tmp_oa);
770         RETURN(rc);
771
772 }
773
774 static int brw_done(struct lov_request_set *set)
775 {
776         struct lov_stripe_md *lsm = set->set_oi->oi_md;
777         struct lov_oinfo     *loi = NULL;
778         struct list_head *pos;
779         struct lov_request *req;
780         ENTRY;
781
782         list_for_each (pos, &set->set_list) {
783                 req = list_entry(pos, struct lov_request, rq_link);
784
785                 if (!req->rq_complete || req->rq_rc)
786                         continue;
787
788                 loi = &lsm->lsm_oinfo[req->rq_stripe];
789
790                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
791                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
792         }
793
794         RETURN(0);
795 }
796
797 int lov_fini_brw_set(struct lov_request_set *set)
798 {
799         int rc = 0;
800         ENTRY;
801
802         if (set == NULL)
803                 RETURN(0);
804         LASSERT(set->set_exp);
805         if (set->set_completes) {
806                 rc = brw_done(set);
807                 /* FIXME update qos data here */
808         }
809         if (atomic_dec_and_test(&set->set_refcount))
810                 lov_finish_set(set);
811
812         RETURN(rc);
813 }
814
815 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
816                      obd_count oa_bufs, struct brw_page *pga,
817                      struct obd_trans_info *oti,
818                      struct lov_request_set **reqset)
819 {
820         struct {
821                 obd_count       index;
822                 obd_count       count;
823                 obd_count       off;
824         } *info = NULL;
825         struct lov_request_set *set;
826         struct lov_oinfo *loi = NULL;
827         struct lov_obd *lov = &exp->exp_obd->u.lov;
828         int rc = 0, i, shift;
829         ENTRY;
830
831         OBD_ALLOC(set, sizeof(*set));
832         if (set == NULL)
833                 RETURN(-ENOMEM);
834         lov_init_set(set);
835
836         set->set_exp = exp;
837         set->set_oti = oti;
838         set->set_oi = oinfo;
839         set->set_oabufs = oa_bufs;
840         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
841         if (!set->set_pga)
842                 GOTO(out, rc = -ENOMEM);
843
844         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
845         if (!info)
846                 GOTO(out, rc = -ENOMEM);
847
848         /* calculate the page count for each stripe */
849         for (i = 0; i < oa_bufs; i++) {
850                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
851                 info[stripe].count++;
852         }
853
854         /* alloc and initialize lov request */
855         shift = 0;
856         for (i = 0, loi = oinfo->oi_md->lsm_oinfo;
857              i < oinfo->oi_md->lsm_stripe_count; i++, loi++){
858                 struct lov_request *req;
859
860                 if (info[i].count == 0)
861                         continue;
862
863                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
864                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
865                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
866                         GOTO(out, rc = -EIO);
867                 }
868
869                 OBD_ALLOC(req, sizeof(*req));
870                 if (req == NULL)
871                         GOTO(out, rc = -ENOMEM);
872
873                 req->rq_oi.oi_oa = obdo_alloc();
874                 if (req->rq_oi.oi_oa == NULL) {
875                         OBD_FREE(req, sizeof(*req));
876                         GOTO(out, rc = -ENOMEM);
877                 }
878
879                 if (oinfo->oi_oa) {
880                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
881                                sizeof(*req->rq_oi.oi_oa));
882                 }
883                 req->rq_oi.oi_oa->o_id = loi->loi_id;
884                 req->rq_oi.oi_oa->o_stripe_idx = i;
885
886                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
887                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
888                 if (req->rq_oi.oi_md == NULL) {
889                         obdo_free(req->rq_oi.oi_oa);
890                         OBD_FREE(req, sizeof(*req));
891                         GOTO(out, rc = -ENOMEM);
892                 }
893
894                 req->rq_idx = loi->loi_ost_idx;
895                 req->rq_stripe = i;
896
897                 /* XXX LOV STACKING */
898                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
899                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
900                 req->rq_oabufs = info[i].count;
901                 req->rq_pgaidx = shift;
902                 shift += req->rq_oabufs;
903
904                 /* remember the index for sort brw_page array */
905                 info[i].index = req->rq_pgaidx;
906
907                 req->rq_oi.oi_capa = oinfo->oi_capa;
908
909                 lov_set_add_req(req, set);
910         }
911         if (!set->set_count)
912                 GOTO(out, rc = -EIO);
913
914         /* rotate & sort the brw_page array */
915         for (i = 0; i < oa_bufs; i++) {
916                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
917
918                 shift = info[stripe].index + info[stripe].off;
919                 LASSERT(shift < oa_bufs);
920                 set->set_pga[shift] = pga[i];
921                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
922                                   &set->set_pga[shift].off);
923                 info[stripe].off++;
924         }
925 out:
926         if (info)
927                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
928
929         if (rc == 0)
930                 *reqset = set;
931         else
932                 lov_fini_brw_set(set);
933
934         RETURN(rc);
935 }
936
937 int lov_fini_getattr_set(struct lov_request_set *set)
938 {
939         int rc = 0;
940         ENTRY;
941
942         if (set == NULL)
943                 RETURN(0);
944         LASSERT(set->set_exp);
945         if (set->set_completes)
946                 rc = common_attr_done(set);
947
948         if (atomic_dec_and_test(&set->set_refcount))
949                 lov_finish_set(set);
950
951         RETURN(rc);
952 }
953
954 /* The callback for osc_getattr_async that finilizes a request info when a
955  * response is recieved. */
956 static int cb_getattr_update(struct obd_info *oinfo, int rc)
957 {
958         struct lov_request *lovreq;
959         lovreq = container_of(oinfo, struct lov_request, rq_oi);
960         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
961 }
962
963 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
964                          struct lov_request_set **reqset)
965 {
966         struct lov_request_set *set;
967         struct lov_oinfo *loi = NULL;
968         struct lov_obd *lov = &exp->exp_obd->u.lov;
969         int rc = 0, i;
970         ENTRY;
971
972         OBD_ALLOC(set, sizeof(*set));
973         if (set == NULL)
974                 RETURN(-ENOMEM);
975         lov_init_set(set);
976
977         set->set_exp = exp;
978         set->set_oi = oinfo;
979
980         loi = oinfo->oi_md->lsm_oinfo;
981         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
982                 struct lov_request *req;
983
984                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
985                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
986                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
987                         continue;
988                 }
989
990                 OBD_ALLOC(req, sizeof(*req));
991                 if (req == NULL)
992                         GOTO(out_set, rc = -ENOMEM);
993
994                 req->rq_stripe = i;
995                 req->rq_idx = loi->loi_ost_idx;
996
997                 req->rq_oi.oi_oa = obdo_alloc();
998                 if (req->rq_oi.oi_oa == NULL) {
999                         OBD_FREE(req, sizeof(*req));
1000                         GOTO(out_set, rc = -ENOMEM);
1001                 }
1002                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1003                        sizeof(*req->rq_oi.oi_oa));
1004                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1005                 req->rq_oi.oi_cb_up = cb_getattr_update;
1006                 req->rq_oi.oi_capa = oinfo->oi_capa;
1007                 req->rq_rqset = set;
1008
1009                 lov_set_add_req(req, set);
1010         }
1011         if (!set->set_count)
1012                 GOTO(out_set, rc = -EIO);
1013         *reqset = set;
1014         RETURN(rc);
1015 out_set:
1016         lov_fini_getattr_set(set);
1017         RETURN(rc);
1018 }
1019
1020 int lov_fini_destroy_set(struct lov_request_set *set)
1021 {
1022         ENTRY;
1023
1024         if (set == NULL)
1025                 RETURN(0);
1026         LASSERT(set->set_exp);
1027         if (set->set_completes) {
1028                 /* FIXME update qos data here */
1029         }
1030
1031         if (atomic_dec_and_test(&set->set_refcount))
1032                 lov_finish_set(set);
1033
1034         RETURN(0);
1035 }
1036
1037 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1038                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1039                          struct obd_trans_info *oti,
1040                          struct lov_request_set **reqset)
1041 {
1042         struct lov_request_set *set;
1043         struct lov_oinfo *loi = NULL;
1044         struct lov_obd *lov = &exp->exp_obd->u.lov;
1045         int rc = 0, i;
1046         ENTRY;
1047
1048         OBD_ALLOC(set, sizeof(*set));
1049         if (set == NULL)
1050                 RETURN(-ENOMEM);
1051         lov_init_set(set);
1052
1053         set->set_exp = exp;
1054         set->set_oi = oinfo;
1055         set->set_oi->oi_md = lsm;
1056         set->set_oi->oi_oa = src_oa;
1057         set->set_oti = oti;
1058         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1059                 set->set_cookies = oti->oti_logcookies;
1060
1061         loi = lsm->lsm_oinfo;
1062         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1063                 struct lov_request *req;
1064
1065                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1066                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1067                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1068                         continue;
1069                 }
1070
1071                 OBD_ALLOC(req, sizeof(*req));
1072                 if (req == NULL)
1073                         GOTO(out_set, rc = -ENOMEM);
1074
1075                 req->rq_stripe = i;
1076                 req->rq_idx = loi->loi_ost_idx;
1077
1078                 req->rq_oi.oi_oa = obdo_alloc();
1079                 if (req->rq_oi.oi_oa == NULL) {
1080                         OBD_FREE(req, sizeof(*req));
1081                         GOTO(out_set, rc = -ENOMEM);
1082                 }
1083                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1084                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1085                 lov_set_add_req(req, set);
1086         }
1087         if (!set->set_count)
1088                 GOTO(out_set, rc = -EIO);
1089         *reqset = set;
1090         RETURN(rc);
1091 out_set:
1092         lov_fini_destroy_set(set);
1093         RETURN(rc);
1094 }
1095
1096 int lov_fini_setattr_set(struct lov_request_set *set)
1097 {
1098         int rc = 0;
1099         ENTRY;
1100
1101         if (set == NULL)
1102                 RETURN(0);
1103         LASSERT(set->set_exp);
1104         if (set->set_completes) {
1105                 rc = common_attr_done(set);
1106                 /* FIXME update qos data here */
1107         }
1108
1109         if (atomic_dec_and_test(&set->set_refcount))
1110                 lov_finish_set(set);
1111         RETURN(rc);
1112 }
1113
1114 int lov_update_setattr_set(struct lov_request_set *set,
1115                            struct lov_request *req, int rc)
1116 {
1117         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1118         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1119         ENTRY;
1120
1121         lov_update_set(set, req, rc);
1122
1123         /* grace error on inactive ost */
1124         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1125                     lov->lov_tgts[req->rq_idx]->ltd_active))
1126                 rc = 0;
1127
1128         if (rc == 0) {
1129                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1130                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_ctime =
1131                                 req->rq_oi.oi_oa->o_ctime;
1132                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1133                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_mtime =
1134                                 req->rq_oi.oi_oa->o_mtime;
1135                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1136                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_atime =
1137                                 req->rq_oi.oi_oa->o_atime;
1138         }
1139
1140         RETURN(rc);
1141 }
1142
1143 /* The callback for osc_setattr_async that finilizes a request info when a
1144  * response is recieved. */
1145 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1146 {
1147         struct lov_request *lovreq;
1148         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1149         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1150 }
1151
1152 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1153                          struct obd_trans_info *oti,
1154                          struct lov_request_set **reqset)
1155 {
1156         struct lov_request_set *set;
1157         struct lov_oinfo *loi = NULL;
1158         struct lov_obd *lov = &exp->exp_obd->u.lov;
1159         int rc = 0, i;
1160         ENTRY;
1161
1162         OBD_ALLOC(set, sizeof(*set));
1163         if (set == NULL)
1164                 RETURN(-ENOMEM);
1165         lov_init_set(set);
1166
1167         set->set_exp = exp;
1168         set->set_oti = oti;
1169         set->set_oi = oinfo;
1170         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1171                 set->set_cookies = oti->oti_logcookies;
1172
1173         loi = oinfo->oi_md->lsm_oinfo;
1174         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1175                 struct lov_request *req;
1176
1177                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1178                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1179                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1180                         continue;
1181                 }
1182
1183                 OBD_ALLOC(req, sizeof(*req));
1184                 if (req == NULL)
1185                         GOTO(out_set, rc = -ENOMEM);
1186                 req->rq_stripe = i;
1187                 req->rq_idx = loi->loi_ost_idx;
1188
1189                 req->rq_oi.oi_oa = obdo_alloc();
1190                 if (req->rq_oi.oi_oa == NULL) {
1191                         OBD_FREE(req, sizeof(*req));
1192                         GOTO(out_set, rc = -ENOMEM);
1193                 }
1194                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1195                        sizeof(*req->rq_oi.oi_oa));
1196                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1197                 LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) 
1198                                 || req->rq_oi.oi_oa->o_gr>0);
1199                 req->rq_oi.oi_oa->o_stripe_idx = i;
1200                 req->rq_oi.oi_cb_up = cb_setattr_update;
1201                 req->rq_oi.oi_capa = oinfo->oi_capa;
1202                 req->rq_rqset = set;
1203
1204                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1205                         int off = lov_stripe_offset(oinfo->oi_md,
1206                                                     oinfo->oi_oa->o_size, i,
1207                                                     &req->rq_oi.oi_oa->o_size);
1208
1209                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1210                                 req->rq_oi.oi_oa->o_size--;
1211
1212                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1213                                i, req->rq_oi.oi_oa->o_size,
1214                                oinfo->oi_oa->o_size);
1215                 }
1216                 lov_set_add_req(req, set);
1217         }
1218         if (!set->set_count)
1219                 GOTO(out_set, rc = -EIO);
1220         *reqset = set;
1221         RETURN(rc);
1222 out_set:
1223         lov_fini_setattr_set(set);
1224         RETURN(rc);
1225 }
1226
1227 int lov_fini_punch_set(struct lov_request_set *set)
1228 {
1229         int rc = 0;
1230         ENTRY;
1231
1232         if (set == NULL)
1233                 RETURN(0);
1234         LASSERT(set->set_exp);
1235         if (set->set_completes) {
1236                 if (!set->set_success)
1237                         rc = -EIO;
1238                 /* FIXME update qos data here */
1239         }
1240
1241         if (atomic_dec_and_test(&set->set_refcount))
1242                 lov_finish_set(set);
1243
1244         RETURN(rc);
1245 }
1246
1247 /* The callback for osc_punch that finilizes a request info when a response
1248  * is recieved. */
1249 static int cb_update_punch(struct obd_info *oinfo, int rc)
1250 {
1251         struct lov_request *lovreq;
1252         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1253         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1254 }
1255
1256 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1257                        struct obd_trans_info *oti,
1258                        struct lov_request_set **reqset)
1259 {
1260         struct lov_request_set *set;
1261         struct lov_oinfo *loi = NULL;
1262         struct lov_obd *lov = &exp->exp_obd->u.lov;
1263         int rc = 0, i;
1264         ENTRY;
1265
1266         OBD_ALLOC(set, sizeof(*set));
1267         if (set == NULL)
1268                 RETURN(-ENOMEM);
1269         lov_init_set(set);
1270
1271         set->set_oi = oinfo;
1272         set->set_exp = exp;
1273
1274         loi = oinfo->oi_md->lsm_oinfo;
1275         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++, loi++) {
1276                 struct lov_request *req;
1277                 obd_off rs, re;
1278
1279                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1280                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1281                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1282                         continue;
1283                 }
1284
1285                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1286                                            oinfo->oi_policy.l_extent.start,
1287                                            oinfo->oi_policy.l_extent.end,
1288                                            &rs, &re))
1289                         continue;
1290
1291                 OBD_ALLOC(req, sizeof(*req));
1292                 if (req == NULL)
1293                         GOTO(out_set, rc = -ENOMEM);
1294                 req->rq_stripe = i;
1295                 req->rq_idx = loi->loi_ost_idx;
1296
1297                 req->rq_oi.oi_oa = obdo_alloc();
1298                 if (req->rq_oi.oi_oa == NULL) {
1299                         OBD_FREE(req, sizeof(*req));
1300                         GOTO(out_set, rc = -ENOMEM);
1301                 }
1302                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1303                        sizeof(*req->rq_oi.oi_oa));
1304                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1305                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1306                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1307
1308                 req->rq_oi.oi_oa->o_stripe_idx = i;
1309                 req->rq_oi.oi_cb_up = cb_update_punch;
1310                 req->rq_rqset = set;
1311
1312                 req->rq_oi.oi_policy.l_extent.start = rs;
1313                 req->rq_oi.oi_policy.l_extent.end = re;
1314                 req->rq_oi.oi_policy.l_extent.gid = -1;
1315
1316                 req->rq_oi.oi_capa = oinfo->oi_capa;
1317
1318                 lov_set_add_req(req, set);
1319         }
1320         if (!set->set_count)
1321                 GOTO(out_set, rc = -EIO);
1322         *reqset = set;
1323         RETURN(rc);
1324 out_set:
1325         lov_fini_punch_set(set);
1326         RETURN(rc);
1327 }
1328
1329 int lov_fini_sync_set(struct lov_request_set *set)
1330 {
1331         int rc = 0;
1332         ENTRY;
1333
1334         if (set == NULL)
1335                 RETURN(0);
1336         LASSERT(set->set_exp);
1337         if (set->set_completes) {
1338                 if (!set->set_success)
1339                         rc = -EIO;
1340                 /* FIXME update qos data here */
1341         }
1342
1343         if (atomic_dec_and_test(&set->set_refcount))
1344                 lov_finish_set(set);
1345
1346         RETURN(rc);
1347 }
1348
1349 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1350                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1351                       obd_off start, obd_off end,
1352                       struct lov_request_set **reqset)
1353 {
1354         struct lov_request_set *set;
1355         struct lov_oinfo *loi = NULL;
1356         struct lov_obd *lov = &exp->exp_obd->u.lov;
1357         int rc = 0, i;
1358         ENTRY;
1359
1360         OBD_ALLOC(set, sizeof(*set));
1361         if (set == NULL)
1362                 RETURN(-ENOMEM);
1363         lov_init_set(set);
1364
1365         set->set_exp = exp;
1366         set->set_oi = oinfo;
1367         set->set_oi->oi_md = lsm;
1368         set->set_oi->oi_oa = src_oa;
1369
1370         loi = lsm->lsm_oinfo;
1371         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1372                 struct lov_request *req;
1373                 obd_off rs, re;
1374
1375                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1376                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1377                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1378                         continue;
1379                 }
1380
1381                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1382                         continue;
1383
1384                 OBD_ALLOC(req, sizeof(*req));
1385                 if (req == NULL)
1386                         GOTO(out_set, rc = -ENOMEM);
1387                 req->rq_stripe = i;
1388                 req->rq_idx = loi->loi_ost_idx;
1389
1390                 req->rq_oi.oi_oa = obdo_alloc();
1391                 if (req->rq_oi.oi_oa == NULL) {
1392                         OBD_FREE(req, sizeof(*req));
1393                         GOTO(out_set, rc = -ENOMEM);
1394                 }
1395                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1396                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1397                 req->rq_oi.oi_oa->o_stripe_idx = i;
1398
1399                 req->rq_oi.oi_policy.l_extent.start = rs;
1400                 req->rq_oi.oi_policy.l_extent.end = re;
1401                 req->rq_oi.oi_policy.l_extent.gid = -1;
1402
1403                 lov_set_add_req(req, set);
1404         }
1405         if (!set->set_count)
1406                 GOTO(out_set, rc = -EIO);
1407         *reqset = set;
1408         RETURN(rc);
1409 out_set:
1410         lov_fini_sync_set(set);
1411         RETURN(rc);
1412 }
1413
1414 #define LOV_U64_MAX ((__u64)~0ULL)
1415 #define LOV_SUM_MAX(tot, add)                                           \
1416         do {                                                            \
1417                 if ((tot) + (add) < (tot))                              \
1418                         (tot) = LOV_U64_MAX;                            \
1419                 else                                                    \
1420                         (tot) += (add);                                 \
1421         } while(0)
1422
1423 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1424 {
1425         ENTRY;
1426
1427         if (success) {
1428                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1429
1430                 if (osfs->os_files != LOV_U64_MAX)
1431                         do_div(osfs->os_files, expected_stripes);
1432                 if (osfs->os_ffree != LOV_U64_MAX)
1433                         do_div(osfs->os_ffree, expected_stripes);
1434
1435                 spin_lock(&obd->obd_osfs_lock);
1436                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1437                 obd->obd_osfs_age = get_jiffies_64();
1438                 spin_unlock(&obd->obd_osfs_lock);
1439                 RETURN(0);
1440         }
1441
1442         RETURN(-EIO);
1443 }
1444
1445 int lov_fini_statfs_set(struct lov_request_set *set)
1446 {
1447         int rc = 0;
1448         ENTRY;
1449
1450         if (set == NULL)
1451                 RETURN(0);
1452
1453         if (set->set_completes) {
1454                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1455                                      set->set_success);
1456         }
1457
1458         if (atomic_dec_and_test(&set->set_refcount))
1459                 lov_finish_set(set);
1460
1461         RETURN(rc);
1462 }
1463
1464 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1465                        struct obd_statfs *lov_sfs, int success)
1466 {
1467         spin_lock(&obd->obd_osfs_lock);
1468         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1469         obd->obd_osfs_age = get_jiffies_64();
1470         spin_unlock(&obd->obd_osfs_lock);
1471
1472         if (success == 0) {
1473                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1474         } else {
1475 #ifdef MIN_DF
1476                 /* Sandia requested that df (and so, statfs) only
1477                    returned minimal available space on
1478                    a single OST, so people would be able to
1479                    write this much data guaranteed. */
1480                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1481                         /* Presumably if new bavail is smaller,
1482                            new bfree is bigger as well */
1483                         osfs->os_bfree = lov_sfs->os_bfree;
1484                         osfs->os_bavail = lov_sfs->os_bavail;
1485                 }
1486 #else
1487                 osfs->os_bfree += lov_sfs->os_bfree;
1488                 osfs->os_bavail += lov_sfs->os_bavail;
1489 #endif
1490                 osfs->os_blocks += lov_sfs->os_blocks;
1491                 /* XXX not sure about this one - depends on policy.
1492                  *   - could be minimum if we always stripe on all OBDs
1493                  *     (but that would be wrong for any other policy,
1494                  *     if one of the OBDs has no more objects left)
1495                  *   - could be sum if we stripe whole objects
1496                  *   - could be average, just to give a nice number
1497                  *
1498                  * To give a "reasonable" (if not wholly accurate)
1499                  * number, we divide the total number of free objects
1500                  * by expected stripe count (watch out for overflow).
1501                  */
1502                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1503                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1504         }
1505 }
1506
1507 /* The callback for osc_statfs_async that finilizes a request info when a
1508  * response is recieved. */
1509 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1510 {
1511         struct lov_request *lovreq;
1512         struct obd_statfs *osfs, *lov_sfs;
1513         struct obd_device *obd;
1514         struct lov_obd *lov;
1515         int success;
1516         ENTRY;
1517
1518         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1519         lov = &lovreq->rq_rqset->set_obd->u.lov;
1520         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1521
1522         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1523         lov_sfs = oinfo->oi_osfs;
1524
1525         success = lovreq->rq_rqset->set_success;
1526
1527         /* XXX: the same is done in lov_update_common_set, however
1528            lovset->set_exp is not initialized. */
1529         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1530         if (rc) {
1531                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1532                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1533                         rc = 0;
1534                 RETURN(rc);
1535         }
1536
1537         lov_update_statfs(obd, osfs, lov_sfs, success);
1538         RETURN(0);
1539 }
1540
1541 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1542                         struct lov_request_set **reqset)
1543 {
1544         struct lov_request_set *set;
1545         struct lov_obd *lov = &obd->u.lov;
1546         int rc = 0, i;
1547         ENTRY;
1548
1549         OBD_ALLOC(set, sizeof(*set));
1550         if (set == NULL)
1551                 RETURN(-ENOMEM);
1552         lov_init_set(set);
1553
1554         set->set_obd = obd;
1555         set->set_oi = oinfo;
1556
1557         /* We only get block data from the OBD */
1558         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1559                 struct lov_request *req;
1560
1561                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1562                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1563                         continue;
1564                 }
1565
1566                 OBD_ALLOC(req, sizeof(*req));
1567                 if (req == NULL)
1568                         GOTO(out_set, rc = -ENOMEM);
1569
1570                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1571                 if (req->rq_oi.oi_osfs == NULL) {
1572                         OBD_FREE(req, sizeof(*req));
1573                         GOTO(out_set, rc = -ENOMEM);
1574                 }
1575
1576                 req->rq_idx = i;
1577                 req->rq_oi.oi_cb_up = cb_statfs_update;
1578                 req->rq_rqset = set;
1579
1580                 lov_set_add_req(req, set);
1581         }
1582         if (!set->set_count)
1583                 GOTO(out_set, rc = -EIO);
1584         *reqset = set;
1585         RETURN(rc);
1586 out_set:
1587         lov_fini_statfs_set(set);
1588         RETURN(rc);
1589 }