Whamcloud - gitweb
port the patch(9414) on HEAD(b1_5)
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         obdo_free(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] &&
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         if (rc != -EINTR)
180                                 CERROR("enqueue objid "LPX64" subobj "
181                                        LPX64" on OST idx %d: rc %d\n",
182                                        set->set_oi->oi_md->lsm_object_id,
183                                        loi->loi_id, loi->loi_ost_idx, rc);
184                 } else {
185                         rc = ELDLM_OK;
186                 }
187         }
188         lov_update_set(set, req, rc);
189         RETURN(rc);
190 }
191
192 /* The callback for osc_enqueue that updates lov info for every OSC request. */
193 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
194 {
195         struct obd_enqueue_info *einfo;
196         struct lov_request *lovreq;
197
198         lovreq = container_of(oinfo, struct lov_request, rq_oi);
199         einfo = lovreq->rq_rqset->set_ei;
200         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
201 }
202
203 static int enqueue_done(struct lov_request_set *set, __u32 mode)
204 {
205         struct lov_request *req;
206         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
207         int rc = 0;
208         ENTRY;
209
210         /* enqueue/match success, just return */
211         if (set->set_completes && set->set_completes == set->set_success)
212                 RETURN(0);
213
214         /* cancel enqueued/matched locks */
215         list_for_each_entry(req, &set->set_list, rq_link) {
216                 struct lustre_handle *lov_lockhp;
217
218                 if (!req->rq_complete || req->rq_rc)
219                         continue;
220
221                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
222                 LASSERT(lov_lockhp);
223                 if (!lustre_handle_is_used(lov_lockhp))
224                         continue;
225
226                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
227                                 req->rq_oi.oi_md, mode, lov_lockhp);
228                 if (rc && lov->lov_tgts[req->rq_idx] &&
229                     lov->lov_tgts[req->rq_idx]->ltd_active)
230                         CERROR("cancelling obdjid "LPX64" on OST "
231                                "idx %d error: rc = %d\n",
232                                req->rq_oi.oi_md->lsm_object_id,
233                                req->rq_idx, rc);
234         }
235         if (set->set_lockh)
236                 lov_llh_put(set->set_lockh);
237         RETURN(rc);
238 }
239
240 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc)
241 {
242         int ret = 0;
243         ENTRY;
244
245         if (set == NULL)
246                 RETURN(0);
247         LASSERT(set->set_exp);
248         /* Do enqueue_done only for sync requests and if any request
249          * succeeded. */
250         if (!set->set_ei->ei_rqset) {
251                 if (rc)
252                         set->set_completes = 0;
253                 ret = enqueue_done(set, mode);
254         } else if (set->set_lockh)
255                 lov_llh_put(set->set_lockh);
256
257         if (atomic_dec_and_test(&set->set_refcount))
258                 lov_finish_set(set);
259
260         RETURN(rc ? rc : ret);
261 }
262
263 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
264                          struct obd_enqueue_info *einfo,
265                          struct lov_request_set **reqset)
266 {
267         struct lov_obd *lov = &exp->exp_obd->u.lov;
268         struct lov_request_set *set;
269         int i, rc = 0;
270         struct lov_oinfo *loi;
271         ENTRY;
272
273         OBD_ALLOC(set, sizeof(*set));
274         if (set == NULL)
275                 RETURN(-ENOMEM);
276         lov_init_set(set);
277
278         set->set_exp = exp;
279         set->set_oi = oinfo;
280         set->set_ei = einfo;
281         set->set_lockh = lov_llh_new(oinfo->oi_md);
282         if (set->set_lockh == NULL)
283                 GOTO(out_set, rc = -ENOMEM);
284         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
285
286         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
287                 struct lov_request *req;
288                 obd_off start, end;
289
290                 loi = oinfo->oi_md->lsm_oinfo[i];
291                 if (!lov_stripe_intersects(oinfo->oi_md, i,
292                                            oinfo->oi_policy.l_extent.start,
293                                            oinfo->oi_policy.l_extent.end,
294                                            &start, &end))
295                         continue;
296
297                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
298                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
299                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
300                         continue;
301                 }
302
303                 OBD_ALLOC(req, sizeof(*req));
304                 if (req == NULL)
305                         GOTO(out_set, rc = -ENOMEM);
306
307                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
308                         sizeof(struct lov_oinfo *) +
309                         sizeof(struct lov_oinfo);
310                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
311                 if (req->rq_oi.oi_md == NULL) {
312                         OBD_FREE(req, sizeof(*req));
313                         GOTO(out_set, rc = -ENOMEM);
314                 }
315                 req->rq_oi.oi_md->lsm_oinfo[0] =
316                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
317                         sizeof(struct lov_oinfo *);
318
319
320                 req->rq_rqset = set;
321                 /* Set lov request specific parameters. */
322                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
323                 req->rq_oi.oi_cb_up = cb_update_enqueue;
324
325                 LASSERT(req->rq_oi.oi_lockh);
326
327                 req->rq_oi.oi_policy.l_extent.gid =
328                         oinfo->oi_policy.l_extent.gid;
329                 req->rq_oi.oi_policy.l_extent.start = start;
330                 req->rq_oi.oi_policy.l_extent.end = end;
331
332                 req->rq_idx = loi->loi_ost_idx;
333                 req->rq_stripe = i;
334
335                 /* XXX LOV STACKING: submd should be from the subobj */
336                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
337                 req->rq_oi.oi_md->lsm_stripe_count = 0;
338                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
339                         loi->loi_kms_valid;
340                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
341                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
342
343                 lov_set_add_req(req, set);
344         }
345         if (!set->set_count)
346                 GOTO(out_set, rc = -EIO);
347         *reqset = set;
348         RETURN(0);
349 out_set:
350         lov_fini_enqueue_set(set, einfo->ei_mode, rc);
351         RETURN(rc);
352 }
353
354 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
355                          int rc)
356 {
357         int ret = rc;
358         ENTRY;
359
360         if (rc == 1)
361                 ret = 0;
362         else if (rc == 0)
363                 ret = 1;
364         lov_update_set(set, req, ret);
365         RETURN(rc);
366 }
367
368 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
369 {
370         int rc = 0;
371         ENTRY;
372
373         if (set == NULL)
374                 RETURN(0);
375         LASSERT(set->set_exp);
376         rc = enqueue_done(set, mode);
377         if ((set->set_count == set->set_success) &&
378             (flags & LDLM_FL_TEST_LOCK))
379                 lov_llh_put(set->set_lockh);
380
381         if (atomic_dec_and_test(&set->set_refcount))
382                 lov_finish_set(set);
383
384         RETURN(rc);
385 }
386
387 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
388                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
389                        __u32 mode, struct lustre_handle *lockh,
390                        struct lov_request_set **reqset)
391 {
392         struct lov_obd *lov = &exp->exp_obd->u.lov;
393         struct lov_request_set *set;
394         int i, rc = 0;
395         struct lov_oinfo *loi;
396         ENTRY;
397
398         OBD_ALLOC(set, sizeof(*set));
399         if (set == NULL)
400                 RETURN(-ENOMEM);
401         lov_init_set(set);
402
403         set->set_exp = exp;
404         set->set_oi = oinfo;
405         set->set_oi->oi_md = lsm;
406         set->set_lockh = lov_llh_new(lsm);
407         if (set->set_lockh == NULL)
408                 GOTO(out_set, rc = -ENOMEM);
409         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
410
411         for (i = 0; i < lsm->lsm_stripe_count; i++){
412                 struct lov_request *req;
413                 obd_off start, end;
414
415                 loi = lsm->lsm_oinfo[i];
416                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
417                                            policy->l_extent.end, &start, &end))
418                         continue;
419
420                 /* FIXME raid1 should grace this error */
421                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
422                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
423                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
424                         GOTO(out_set, rc = -EIO);
425                 }
426
427                 OBD_ALLOC(req, sizeof(*req));
428                 if (req == NULL)
429                         GOTO(out_set, rc = -ENOMEM);
430
431                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
432                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
433                 if (req->rq_oi.oi_md == NULL) {
434                         OBD_FREE(req, sizeof(*req));
435                         GOTO(out_set, rc = -ENOMEM);
436                 }
437
438                 req->rq_oi.oi_policy.l_extent.start = start;
439                 req->rq_oi.oi_policy.l_extent.end = end;
440                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
441
442                 req->rq_idx = loi->loi_ost_idx;
443                 req->rq_stripe = i;
444
445                 /* XXX LOV STACKING: submd should be from the subobj */
446                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
447                 req->rq_oi.oi_md->lsm_stripe_count = 0;
448
449                 lov_set_add_req(req, set);
450         }
451         if (!set->set_count)
452                 GOTO(out_set, rc = -EIO);
453         *reqset = set;
454         RETURN(rc);
455 out_set:
456         lov_fini_match_set(set, mode, 0);
457         RETURN(rc);
458 }
459
460 int lov_fini_cancel_set(struct lov_request_set *set)
461 {
462         int rc = 0;
463         ENTRY;
464
465         if (set == NULL)
466                 RETURN(0);
467
468         LASSERT(set->set_exp);
469         if (set->set_lockh)
470                 lov_llh_put(set->set_lockh);
471
472         if (atomic_dec_and_test(&set->set_refcount))
473                 lov_finish_set(set);
474
475         RETURN(rc);
476 }
477
478 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
479                         struct lov_stripe_md *lsm, __u32 mode,
480                         struct lustre_handle *lockh,
481                         struct lov_request_set **reqset)
482 {
483         struct lov_request_set *set;
484         int i, rc = 0;
485         struct lov_oinfo *loi;
486         ENTRY;
487
488         OBD_ALLOC(set, sizeof(*set));
489         if (set == NULL)
490                 RETURN(-ENOMEM);
491         lov_init_set(set);
492
493         set->set_exp = exp;
494         set->set_oi = oinfo;
495         set->set_oi->oi_md = lsm;
496         set->set_lockh = lov_handle2llh(lockh);
497         if (set->set_lockh == NULL) {
498                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
499                 GOTO(out_set, rc = -EINVAL);
500         }
501         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
502
503         for (i = 0; i < lsm->lsm_stripe_count; i++){
504                 struct lov_request *req;
505                 struct lustre_handle *lov_lockhp;
506
507                 loi = lsm->lsm_oinfo[i];
508                 lov_lockhp = set->set_lockh->llh_handles + i;
509                 if (!lustre_handle_is_used(lov_lockhp)) {
510                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
511                                loi->loi_ost_idx, loi->loi_id);
512                         continue;
513                 }
514
515                 OBD_ALLOC(req, sizeof(*req));
516                 if (req == NULL)
517                         GOTO(out_set, rc = -ENOMEM);
518
519                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
520                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
521                 if (req->rq_oi.oi_md == NULL) {
522                         OBD_FREE(req, sizeof(*req));
523                         GOTO(out_set, rc = -ENOMEM);
524                 }
525
526                 req->rq_idx = loi->loi_ost_idx;
527                 req->rq_stripe = i;
528
529                 /* XXX LOV STACKING: submd should be from the subobj */
530                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
531                 req->rq_oi.oi_md->lsm_stripe_count = 0;
532
533                 lov_set_add_req(req, set);
534         }
535         if (!set->set_count)
536                 GOTO(out_set, rc = -EIO);
537         *reqset = set;
538         RETURN(rc);
539 out_set:
540         lov_fini_cancel_set(set);
541         RETURN(rc);
542 }
543
544 static int create_done(struct obd_export *exp, struct lov_request_set *set,
545                        struct lov_stripe_md **lsmp)
546 {
547         struct lov_obd *lov = &exp->exp_obd->u.lov;
548         struct obd_trans_info *oti = set->set_oti;
549         struct obdo *src_oa = set->set_oi->oi_oa;
550         struct lov_request *req;
551         struct obdo *ret_oa = NULL;
552         int attrset = 0, rc = 0;
553         ENTRY;
554
555         LASSERT(set->set_completes);
556
557         /* try alloc objects on other osts if osc_create fails for
558          * exceptions: RPC failure, ENOSPC, etc */
559         if (set->set_count != set->set_success) {
560                 list_for_each_entry (req, &set->set_list, rq_link) {
561                         if (req->rq_rc == 0)
562                                 continue;
563
564                         set->set_completes--;
565                         req->rq_complete = 0;
566
567                         rc = qos_remedy_create(set, req);
568                         lov_update_create_set(set, req, rc);
569
570                         if (rc)
571                                 break;
572                 }
573         }
574
575         /* no successful creates */
576         if (set->set_success == 0)
577                 GOTO(cleanup, rc);
578
579         /* If there was an explicit stripe set, fail.  Otherwise, we
580          * got some objects and that's not bad. */
581         if (set->set_count != set->set_success) {
582                 if (*lsmp)
583                         GOTO(cleanup, rc);
584                 set->set_count = set->set_success;
585                 qos_shrink_lsm(set);
586         }
587
588         ret_oa = obdo_alloc();
589         if (ret_oa == NULL)
590                 GOTO(cleanup, rc = -ENOMEM);
591
592         list_for_each_entry(req, &set->set_list, rq_link) {
593                 if (!req->rq_complete || req->rq_rc)
594                         continue;
595                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
596                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
597                                 req->rq_stripe, &attrset);
598         }
599         if (src_oa->o_valid & OBD_MD_FLSIZE &&
600             ret_oa->o_size != src_oa->o_size) {
601                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
602                        src_oa->o_size, ret_oa->o_size);
603                 LBUG();
604         }
605         ret_oa->o_id = src_oa->o_id;
606         memcpy(src_oa, ret_oa, sizeof(*src_oa));
607         obdo_free(ret_oa);
608
609         *lsmp = set->set_oi->oi_md;
610         GOTO(done, rc = 0);
611
612 cleanup:
613         list_for_each_entry(req, &set->set_list, rq_link) {
614                 struct obd_export *sub_exp;
615                 int err = 0;
616
617                 if (!req->rq_complete || req->rq_rc)
618                         continue;
619
620                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
621                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
622                 if (err)
623                         CERROR("Failed to uncreate objid "LPX64" subobj "
624                                LPX64" on OST idx %d: rc = %d\n",
625                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
626                                req->rq_idx, rc);
627         }
628         if (*lsmp == NULL)
629                 obd_free_memmd(exp, &set->set_oi->oi_md);
630 done:
631         if (oti && set->set_cookies) {
632                 oti->oti_logcookies = set->set_cookies;
633                 if (!set->set_cookie_sent) {
634                         oti_free_cookies(oti);
635                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
636                 } else {
637                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
638                 }
639         }
640         RETURN(rc);
641 }
642
643 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
644 {
645         int rc = 0;
646         ENTRY;
647
648         if (set == NULL)
649                 RETURN(0);
650         LASSERT(set->set_exp);
651         if (set->set_completes)
652                 rc = create_done(set->set_exp, set, lsmp);
653
654         if (atomic_dec_and_test(&set->set_refcount))
655                 lov_finish_set(set);
656
657         RETURN(rc);
658 }
659
660 int lov_update_create_set(struct lov_request_set *set,
661                           struct lov_request *req, int rc)
662 {
663         struct obd_trans_info *oti = set->set_oti;
664         struct lov_stripe_md *lsm = set->set_oi->oi_md;
665         struct lov_oinfo *loi;
666         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
667         ENTRY;
668
669         req->rq_stripe = set->set_success;
670         loi = lsm->lsm_oinfo[req->rq_stripe];
671
672         if (rc && lov->lov_tgts[req->rq_idx] &&
673             lov->lov_tgts[req->rq_idx]->ltd_active) {
674                 CERROR("error creating fid "LPX64" sub-object"
675                        " on OST idx %d/%d: rc = %d\n",
676                        set->set_oi->oi_oa->o_id, req->rq_idx,
677                        lsm->lsm_stripe_count, rc);
678                 if (rc > 0) {
679                         CERROR("obd_create returned invalid err %d\n", rc);
680                         rc = -EIO;
681                 }
682         }
683         lov_update_set(set, req, rc);
684         if (rc)
685                 RETURN(rc);
686
687         if (oti && oti->oti_objid)
688                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
689
690         loi->loi_id = req->rq_oi.oi_oa->o_id;
691         loi->loi_ost_idx = req->rq_idx;
692         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
693                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
694         loi_init(loi);
695
696         if (oti && set->set_cookies)
697                 ++oti->oti_logcookies;
698         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
699                 set->set_cookie_sent++;
700
701         RETURN(0);
702 }
703
704 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
705                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
706                         struct obd_trans_info *oti,
707                         struct lov_request_set **reqset)
708 {
709         struct lov_request_set *set;
710         int rc = 0;
711         ENTRY;
712
713         OBD_ALLOC(set, sizeof(*set));
714         if (set == NULL)
715                 RETURN(-ENOMEM);
716         lov_init_set(set);
717
718         set->set_exp = exp;
719         set->set_oi = oinfo;
720         set->set_oi->oi_md = *lsmp;
721         set->set_oi->oi_oa = src_oa;
722         set->set_oti = oti;
723
724         rc = qos_prep_create(exp, set);
725         if (rc)
726                 lov_fini_create_set(set, lsmp);
727         else
728                 *reqset = set;
729         RETURN(rc);
730 }
731
732 static int common_attr_done(struct lov_request_set *set)
733 {
734         struct list_head *pos;
735         struct lov_request *req;
736         struct obdo *tmp_oa;
737         int rc = 0, attrset = 0;
738         ENTRY;
739
740         LASSERT(set->set_oi != NULL);
741
742         if (set->set_oi->oi_oa == NULL)
743                 RETURN(0);
744
745         if (!set->set_success)
746                 RETURN(-EIO);
747
748         tmp_oa = obdo_alloc();
749         if (tmp_oa == NULL)
750                 GOTO(out, rc = -ENOMEM);
751
752         list_for_each (pos, &set->set_list) {
753                 req = list_entry(pos, struct lov_request, rq_link);
754
755                 if (!req->rq_complete || req->rq_rc)
756                         continue;
757                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
758                         continue;
759                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
760                                 req->rq_oi.oi_oa->o_valid,
761                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
762         }
763         if (!attrset) {
764                 CERROR("No stripes had valid attrs\n");
765                 rc = -EIO;
766         }
767         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
768         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
769 out:
770         if (tmp_oa)
771                 obdo_free(tmp_oa);
772         RETURN(rc);
773
774 }
775
776 static int brw_done(struct lov_request_set *set)
777 {
778         struct lov_stripe_md *lsm = set->set_oi->oi_md;
779         struct lov_oinfo     *loi = NULL;
780         struct list_head *pos;
781         struct lov_request *req;
782         ENTRY;
783
784         list_for_each (pos, &set->set_list) {
785                 req = list_entry(pos, struct lov_request, rq_link);
786
787                 if (!req->rq_complete || req->rq_rc)
788                         continue;
789
790                 loi = lsm->lsm_oinfo[req->rq_stripe];
791
792                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
793                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
794         }
795
796         RETURN(0);
797 }
798
799 int lov_fini_brw_set(struct lov_request_set *set)
800 {
801         int rc = 0;
802         ENTRY;
803
804         if (set == NULL)
805                 RETURN(0);
806         LASSERT(set->set_exp);
807         if (set->set_completes) {
808                 rc = brw_done(set);
809                 /* FIXME update qos data here */
810         }
811         if (atomic_dec_and_test(&set->set_refcount))
812                 lov_finish_set(set);
813
814         RETURN(rc);
815 }
816
817 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
818                      obd_count oa_bufs, struct brw_page *pga,
819                      struct obd_trans_info *oti,
820                      struct lov_request_set **reqset)
821 {
822         struct {
823                 obd_count       index;
824                 obd_count       count;
825                 obd_count       off;
826         } *info = NULL;
827         struct lov_request_set *set;
828         struct lov_oinfo *loi = NULL;
829         struct lov_obd *lov = &exp->exp_obd->u.lov;
830         int rc = 0, i, shift;
831         ENTRY;
832
833         OBD_ALLOC(set, sizeof(*set));
834         if (set == NULL)
835                 RETURN(-ENOMEM);
836         lov_init_set(set);
837
838         set->set_exp = exp;
839         set->set_oti = oti;
840         set->set_oi = oinfo;
841         set->set_oabufs = oa_bufs;
842         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
843         if (!set->set_pga)
844                 GOTO(out, rc = -ENOMEM);
845
846         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
847         if (!info)
848                 GOTO(out, rc = -ENOMEM);
849
850         /* calculate the page count for each stripe */
851         for (i = 0; i < oa_bufs; i++) {
852                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
853                 info[stripe].count++;
854         }
855
856         /* alloc and initialize lov request */
857         shift = 0;
858         for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
859                 struct lov_request *req;
860
861                 if (info[i].count == 0)
862                         continue;
863
864                 loi = oinfo->oi_md->lsm_oinfo[i];
865                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
866                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
867                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
868                         GOTO(out, rc = -EIO);
869                 }
870
871                 OBD_ALLOC(req, sizeof(*req));
872                 if (req == NULL)
873                         GOTO(out, rc = -ENOMEM);
874
875                 req->rq_oi.oi_oa = obdo_alloc();
876                 if (req->rq_oi.oi_oa == NULL) {
877                         OBD_FREE(req, sizeof(*req));
878                         GOTO(out, rc = -ENOMEM);
879                 }
880
881                 if (oinfo->oi_oa) {
882                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
883                                sizeof(*req->rq_oi.oi_oa));
884                 }
885                 req->rq_oi.oi_oa->o_id = loi->loi_id;
886                 req->rq_oi.oi_oa->o_stripe_idx = i;
887
888                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
889                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
890                 if (req->rq_oi.oi_md == NULL) {
891                         obdo_free(req->rq_oi.oi_oa);
892                         OBD_FREE(req, sizeof(*req));
893                         GOTO(out, rc = -ENOMEM);
894                 }
895
896                 req->rq_idx = loi->loi_ost_idx;
897                 req->rq_stripe = i;
898
899                 /* XXX LOV STACKING */
900                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
901                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
902                 req->rq_oabufs = info[i].count;
903                 req->rq_pgaidx = shift;
904                 shift += req->rq_oabufs;
905
906                 /* remember the index for sort brw_page array */
907                 info[i].index = req->rq_pgaidx;
908
909                 lov_set_add_req(req, set);
910         }
911         if (!set->set_count)
912                 GOTO(out, rc = -EIO);
913
914         /* rotate & sort the brw_page array */
915         for (i = 0; i < oa_bufs; i++) {
916                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
917
918                 shift = info[stripe].index + info[stripe].off;
919                 LASSERT(shift < oa_bufs);
920                 set->set_pga[shift] = pga[i];
921                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
922                                   &set->set_pga[shift].off);
923                 info[stripe].off++;
924         }
925 out:
926         if (info)
927                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
928
929         if (rc == 0)
930                 *reqset = set;
931         else
932                 lov_fini_brw_set(set);
933
934         RETURN(rc);
935 }
936
937 int lov_fini_getattr_set(struct lov_request_set *set)
938 {
939         int rc = 0;
940         ENTRY;
941
942         if (set == NULL)
943                 RETURN(0);
944         LASSERT(set->set_exp);
945         if (set->set_completes)
946                 rc = common_attr_done(set);
947
948         if (atomic_dec_and_test(&set->set_refcount))
949                 lov_finish_set(set);
950
951         RETURN(rc);
952 }
953
954 /* The callback for osc_getattr_async that finilizes a request info when a
955  * response is recieved. */
956 static int cb_getattr_update(struct obd_info *oinfo, int rc)
957 {
958         struct lov_request *lovreq;
959         lovreq = container_of(oinfo, struct lov_request, rq_oi);
960         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
961 }
962
963 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
964                          struct lov_request_set **reqset)
965 {
966         struct lov_request_set *set;
967         struct lov_oinfo *loi = NULL;
968         struct lov_obd *lov = &exp->exp_obd->u.lov;
969         int rc = 0, i;
970         ENTRY;
971
972         OBD_ALLOC(set, sizeof(*set));
973         if (set == NULL)
974                 RETURN(-ENOMEM);
975         lov_init_set(set);
976
977         set->set_exp = exp;
978         set->set_oi = oinfo;
979
980         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
981                 struct lov_request *req;
982
983                 loi = oinfo->oi_md->lsm_oinfo[i];
984                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
985                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
986                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
987                         continue;
988                 }
989
990                 OBD_ALLOC(req, sizeof(*req));
991                 if (req == NULL)
992                         GOTO(out_set, rc = -ENOMEM);
993
994                 req->rq_stripe = i;
995                 req->rq_idx = loi->loi_ost_idx;
996
997                 req->rq_oi.oi_oa = obdo_alloc();
998                 if (req->rq_oi.oi_oa == NULL) {
999                         OBD_FREE(req, sizeof(*req));
1000                         GOTO(out_set, rc = -ENOMEM);
1001                 }
1002                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1003                        sizeof(*req->rq_oi.oi_oa));
1004                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1005                 req->rq_oi.oi_cb_up = cb_getattr_update;
1006
1007                 lov_set_add_req(req, set);
1008         }
1009         if (!set->set_count)
1010                 GOTO(out_set, rc = -EIO);
1011         *reqset = set;
1012         RETURN(rc);
1013 out_set:
1014         lov_fini_getattr_set(set);
1015         RETURN(rc);
1016 }
1017
1018 int lov_fini_destroy_set(struct lov_request_set *set)
1019 {
1020         ENTRY;
1021
1022         if (set == NULL)
1023                 RETURN(0);
1024         LASSERT(set->set_exp);
1025         if (set->set_completes) {
1026                 /* FIXME update qos data here */
1027         }
1028
1029         if (atomic_dec_and_test(&set->set_refcount))
1030                 lov_finish_set(set);
1031
1032         RETURN(0);
1033 }
1034
1035 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1036                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1037                          struct obd_trans_info *oti,
1038                          struct lov_request_set **reqset)
1039 {
1040         struct lov_request_set *set;
1041         struct lov_oinfo *loi = NULL;
1042         struct lov_obd *lov = &exp->exp_obd->u.lov;
1043         int rc = 0, i;
1044         ENTRY;
1045
1046         OBD_ALLOC(set, sizeof(*set));
1047         if (set == NULL)
1048                 RETURN(-ENOMEM);
1049         lov_init_set(set);
1050
1051         set->set_exp = exp;
1052         set->set_oi = oinfo;
1053         set->set_oi->oi_md = lsm;
1054         set->set_oi->oi_oa = src_oa;
1055         set->set_oti = oti;
1056         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1057                 set->set_cookies = oti->oti_logcookies;
1058
1059         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1060                 struct lov_request *req;
1061
1062                 loi = lsm->lsm_oinfo[i];
1063                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1064                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1065                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1066                         continue;
1067                 }
1068
1069                 OBD_ALLOC(req, sizeof(*req));
1070                 if (req == NULL)
1071                         GOTO(out_set, rc = -ENOMEM);
1072
1073                 req->rq_stripe = i;
1074                 req->rq_idx = loi->loi_ost_idx;
1075
1076                 req->rq_oi.oi_oa = obdo_alloc();
1077                 if (req->rq_oi.oi_oa == NULL) {
1078                         OBD_FREE(req, sizeof(*req));
1079                         GOTO(out_set, rc = -ENOMEM);
1080                 }
1081                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1082                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1083                 lov_set_add_req(req, set);
1084         }
1085         if (!set->set_count)
1086                 GOTO(out_set, rc = -EIO);
1087         *reqset = set;
1088         RETURN(rc);
1089 out_set:
1090         lov_fini_destroy_set(set);
1091         RETURN(rc);
1092 }
1093
1094 int lov_fini_setattr_set(struct lov_request_set *set)
1095 {
1096         int rc = 0;
1097         ENTRY;
1098
1099         if (set == NULL)
1100                 RETURN(0);
1101         LASSERT(set->set_exp);
1102         if (set->set_completes) {
1103                 rc = common_attr_done(set);
1104                 /* FIXME update qos data here */
1105         }
1106
1107         if (atomic_dec_and_test(&set->set_refcount))
1108                 lov_finish_set(set);
1109         RETURN(rc);
1110 }
1111
1112 int lov_update_setattr_set(struct lov_request_set *set,
1113                            struct lov_request *req, int rc)
1114 {
1115         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1116         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1117         ENTRY;
1118
1119         lov_update_set(set, req, rc);
1120
1121         /* grace error on inactive ost */
1122         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1123                     lov->lov_tgts[req->rq_idx]->ltd_active))
1124                 rc = 0;
1125
1126         if (rc == 0) {
1127                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1128                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1129                                 req->rq_oi.oi_oa->o_ctime;
1130                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1131                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1132                                 req->rq_oi.oi_oa->o_mtime;
1133                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1134                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1135                                 req->rq_oi.oi_oa->o_atime;
1136         }
1137
1138         RETURN(rc);
1139 }
1140
1141 /* The callback for osc_setattr_async that finilizes a request info when a
1142  * response is recieved. */
1143 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1144 {
1145         struct lov_request *lovreq;
1146         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1147         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1148 }
1149
1150 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1151                          struct obd_trans_info *oti,
1152                          struct lov_request_set **reqset)
1153 {
1154         struct lov_request_set *set;
1155         struct lov_oinfo *loi = NULL;
1156         struct lov_obd *lov = &exp->exp_obd->u.lov;
1157         int rc = 0, i;
1158         ENTRY;
1159
1160         OBD_ALLOC(set, sizeof(*set));
1161         if (set == NULL)
1162                 RETURN(-ENOMEM);
1163         lov_init_set(set);
1164
1165         set->set_exp = exp;
1166         set->set_oti = oti;
1167         set->set_oi = oinfo;
1168         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1169                 set->set_cookies = oti->oti_logcookies;
1170
1171         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1172                 struct lov_request *req;
1173
1174                 loi = oinfo->oi_md->lsm_oinfo[i];
1175                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1176                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1177                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1178                         continue;
1179                 }
1180
1181                 OBD_ALLOC(req, sizeof(*req));
1182                 if (req == NULL)
1183                         GOTO(out_set, rc = -ENOMEM);
1184                 req->rq_stripe = i;
1185                 req->rq_idx = loi->loi_ost_idx;
1186
1187                 req->rq_oi.oi_oa = obdo_alloc();
1188                 if (req->rq_oi.oi_oa == NULL) {
1189                         OBD_FREE(req, sizeof(*req));
1190                         GOTO(out_set, rc = -ENOMEM);
1191                 }
1192                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1193                        sizeof(*req->rq_oi.oi_oa));
1194                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1195                 req->rq_oi.oi_oa->o_stripe_idx = i;
1196                 req->rq_oi.oi_cb_up = cb_setattr_update;
1197                 req->rq_rqset = set;
1198
1199                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1200                         int off = lov_stripe_offset(oinfo->oi_md,
1201                                                     oinfo->oi_oa->o_size, i,
1202                                                     &req->rq_oi.oi_oa->o_size);
1203
1204                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1205                                 req->rq_oi.oi_oa->o_size--;
1206
1207                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1208                                i, req->rq_oi.oi_oa->o_size,
1209                                oinfo->oi_oa->o_size);
1210                 }
1211                 lov_set_add_req(req, set);
1212         }
1213         if (!set->set_count)
1214                 GOTO(out_set, rc = -EIO);
1215         *reqset = set;
1216         RETURN(rc);
1217 out_set:
1218         lov_fini_setattr_set(set);
1219         RETURN(rc);
1220 }
1221
1222 int lov_fini_punch_set(struct lov_request_set *set)
1223 {
1224         int rc = 0;
1225         ENTRY;
1226
1227         if (set == NULL)
1228                 RETURN(0);
1229         LASSERT(set->set_exp);
1230         if (set->set_completes) {
1231                 rc = -EIO;
1232                 /* FIXME update qos data here */
1233                 if (set->set_success)
1234                         rc = common_attr_done(set);
1235         }
1236
1237         if (atomic_dec_and_test(&set->set_refcount))
1238                 lov_finish_set(set);
1239
1240         RETURN(rc);
1241 }
1242
1243 int lov_update_punch_set(struct lov_request_set *set,
1244                            struct lov_request *req, int rc)
1245 {
1246         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1247         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1248         ENTRY;
1249
1250         lov_update_set(set, req, rc);
1251
1252         /* grace error on inactive ost */
1253         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1254                 rc = 0;
1255
1256         if (rc == 0) {
1257                 lov_stripe_lock(lsm);
1258                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1259                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1260                                 req->rq_oi.oi_oa->o_blocks;
1261                 }
1262
1263                 /* Do we need to update lvb_size here? It needn't because
1264                  * it have been done in ll_truncate(). -jay */
1265                 lov_stripe_unlock(lsm);
1266         }
1267
1268         RETURN(rc);
1269 }
1270
1271 /* The callback for osc_punch that finilizes a request info when a response
1272  * is recieved. */
1273 static int cb_update_punch(struct obd_info *oinfo, int rc)
1274 {
1275         struct lov_request *lovreq;
1276         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1277         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1278 }
1279
1280 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1281                        struct obd_trans_info *oti,
1282                        struct lov_request_set **reqset)
1283 {
1284         struct lov_request_set *set;
1285         struct lov_oinfo *loi = NULL;
1286         struct lov_obd *lov = &exp->exp_obd->u.lov;
1287         int rc = 0, i;
1288         ENTRY;
1289
1290         OBD_ALLOC(set, sizeof(*set));
1291         if (set == NULL)
1292                 RETURN(-ENOMEM);
1293         lov_init_set(set);
1294
1295         set->set_oi = oinfo;
1296         set->set_exp = exp;
1297
1298         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1299                 struct lov_request *req;
1300                 obd_off rs, re;
1301
1302                 loi = oinfo->oi_md->lsm_oinfo[i];
1303                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1304                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1305                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1306                         continue;
1307                 }
1308
1309                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1310                                            oinfo->oi_policy.l_extent.start,
1311                                            oinfo->oi_policy.l_extent.end,
1312                                            &rs, &re))
1313                         continue;
1314
1315                 OBD_ALLOC(req, sizeof(*req));
1316                 if (req == NULL)
1317                         GOTO(out_set, rc = -ENOMEM);
1318                 req->rq_stripe = i;
1319                 req->rq_idx = loi->loi_ost_idx;
1320
1321                 req->rq_oi.oi_oa = obdo_alloc();
1322                 if (req->rq_oi.oi_oa == NULL) {
1323                         OBD_FREE(req, sizeof(*req));
1324                         GOTO(out_set, rc = -ENOMEM);
1325                 }
1326                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1327                        sizeof(*req->rq_oi.oi_oa));
1328                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1329                 req->rq_oi.oi_oa->o_stripe_idx = i;
1330                 req->rq_oi.oi_cb_up = cb_update_punch;
1331                 req->rq_rqset = set;
1332
1333                 req->rq_oi.oi_policy.l_extent.start = rs;
1334                 req->rq_oi.oi_policy.l_extent.end = re;
1335                 req->rq_oi.oi_policy.l_extent.gid = -1;
1336
1337                 lov_set_add_req(req, set);
1338         }
1339         if (!set->set_count)
1340                 GOTO(out_set, rc = -EIO);
1341         *reqset = set;
1342         RETURN(rc);
1343 out_set:
1344         lov_fini_punch_set(set);
1345         RETURN(rc);
1346 }
1347
1348 int lov_fini_sync_set(struct lov_request_set *set)
1349 {
1350         int rc = 0;
1351         ENTRY;
1352
1353         if (set == NULL)
1354                 RETURN(0);
1355         LASSERT(set->set_exp);
1356         if (set->set_completes) {
1357                 if (!set->set_success)
1358                         rc = -EIO;
1359                 /* FIXME update qos data here */
1360         }
1361
1362         if (atomic_dec_and_test(&set->set_refcount))
1363                 lov_finish_set(set);
1364
1365         RETURN(rc);
1366 }
1367
1368 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1369                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1370                       obd_off start, obd_off end,
1371                       struct lov_request_set **reqset)
1372 {
1373         struct lov_request_set *set;
1374         struct lov_oinfo *loi = NULL;
1375         struct lov_obd *lov = &exp->exp_obd->u.lov;
1376         int rc = 0, i;
1377         ENTRY;
1378
1379         OBD_ALLOC(set, sizeof(*set));
1380         if (set == NULL)
1381                 RETURN(-ENOMEM);
1382         lov_init_set(set);
1383
1384         set->set_exp = exp;
1385         set->set_oi = oinfo;
1386         set->set_oi->oi_md = lsm;
1387         set->set_oi->oi_oa = src_oa;
1388
1389         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1390                 struct lov_request *req;
1391                 obd_off rs, re;
1392
1393                 loi = lsm->lsm_oinfo[i];
1394                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1395                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1396                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1397                         continue;
1398                 }
1399
1400                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1401                         continue;
1402
1403                 OBD_ALLOC(req, sizeof(*req));
1404                 if (req == NULL)
1405                         GOTO(out_set, rc = -ENOMEM);
1406                 req->rq_stripe = i;
1407                 req->rq_idx = loi->loi_ost_idx;
1408
1409                 req->rq_oi.oi_oa = obdo_alloc();
1410                 if (req->rq_oi.oi_oa == NULL) {
1411                         OBD_FREE(req, sizeof(*req));
1412                         GOTO(out_set, rc = -ENOMEM);
1413                 }
1414                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1415                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1416                 req->rq_oi.oi_oa->o_stripe_idx = i;
1417
1418                 req->rq_oi.oi_policy.l_extent.start = rs;
1419                 req->rq_oi.oi_policy.l_extent.end = re;
1420                 req->rq_oi.oi_policy.l_extent.gid = -1;
1421
1422                 lov_set_add_req(req, set);
1423         }
1424         if (!set->set_count)
1425                 GOTO(out_set, rc = -EIO);
1426         *reqset = set;
1427         RETURN(rc);
1428 out_set:
1429         lov_fini_sync_set(set);
1430         RETURN(rc);
1431 }
1432
1433 #define LOV_U64_MAX ((__u64)~0ULL)
1434 #define LOV_SUM_MAX(tot, add)                                           \
1435         do {                                                            \
1436                 if ((tot) + (add) < (tot))                              \
1437                         (tot) = LOV_U64_MAX;                            \
1438                 else                                                    \
1439                         (tot) += (add);                                 \
1440         } while(0)
1441
1442 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1443 {
1444         ENTRY;
1445
1446         if (success) {
1447                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1448
1449                 if (osfs->os_files != LOV_U64_MAX)
1450                         do_div(osfs->os_files, expected_stripes);
1451                 if (osfs->os_ffree != LOV_U64_MAX)
1452                         do_div(osfs->os_ffree, expected_stripes);
1453
1454                 spin_lock(&obd->obd_osfs_lock);
1455                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1456                 obd->obd_osfs_age = get_jiffies_64();
1457                 spin_unlock(&obd->obd_osfs_lock);
1458                 RETURN(0);
1459         }
1460
1461         RETURN(-EIO);
1462 }
1463
1464 int lov_fini_statfs_set(struct lov_request_set *set)
1465 {
1466         int rc = 0;
1467         ENTRY;
1468
1469         if (set == NULL)
1470                 RETURN(0);
1471
1472         if (set->set_completes) {
1473                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1474                                      set->set_success);
1475         }
1476
1477         if (atomic_dec_and_test(&set->set_refcount))
1478                 lov_finish_set(set);
1479
1480         RETURN(rc);
1481 }
1482
1483 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1484                        struct obd_statfs *lov_sfs, int success)
1485 {
1486         spin_lock(&obd->obd_osfs_lock);
1487         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1488         obd->obd_osfs_age = get_jiffies_64();
1489         spin_unlock(&obd->obd_osfs_lock);
1490
1491         if (success == 0) {
1492                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1493         } else {
1494 #ifdef MIN_DF
1495                 /* Sandia requested that df (and so, statfs) only
1496                    returned minimal available space on
1497                    a single OST, so people would be able to
1498                    write this much data guaranteed. */
1499                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1500                         /* Presumably if new bavail is smaller,
1501                            new bfree is bigger as well */
1502                         osfs->os_bfree = lov_sfs->os_bfree;
1503                         osfs->os_bavail = lov_sfs->os_bavail;
1504                 }
1505 #else
1506                 osfs->os_bfree += lov_sfs->os_bfree;
1507                 osfs->os_bavail += lov_sfs->os_bavail;
1508 #endif
1509                 osfs->os_blocks += lov_sfs->os_blocks;
1510                 /* XXX not sure about this one - depends on policy.
1511                  *   - could be minimum if we always stripe on all OBDs
1512                  *     (but that would be wrong for any other policy,
1513                  *     if one of the OBDs has no more objects left)
1514                  *   - could be sum if we stripe whole objects
1515                  *   - could be average, just to give a nice number
1516                  *
1517                  * To give a "reasonable" (if not wholly accurate)
1518                  * number, we divide the total number of free objects
1519                  * by expected stripe count (watch out for overflow).
1520                  */
1521                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1522                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1523         }
1524 }
1525
1526 /* The callback for osc_statfs_async that finilizes a request info when a
1527  * response is recieved. */
1528 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1529 {
1530         struct lov_request *lovreq;
1531         struct obd_statfs *osfs, *lov_sfs;
1532         struct obd_device *obd;
1533         struct lov_obd *lov;
1534         int success;
1535         ENTRY;
1536
1537         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1538         lov = &lovreq->rq_rqset->set_obd->u.lov;
1539         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1540
1541         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1542         lov_sfs = oinfo->oi_osfs;
1543
1544         success = lovreq->rq_rqset->set_success;
1545
1546         /* XXX: the same is done in lov_update_common_set, however
1547            lovset->set_exp is not initialized. */
1548         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1549         if (rc) {
1550                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1551                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1552                         rc = 0;
1553                 RETURN(rc);
1554         }
1555
1556         lov_update_statfs(obd, osfs, lov_sfs, success);
1557         qos_update(lov);
1558
1559         RETURN(0);
1560 }
1561
1562 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1563                         struct lov_request_set **reqset)
1564 {
1565         struct lov_request_set *set;
1566         struct lov_obd *lov = &obd->u.lov;
1567         int rc = 0, i;
1568         ENTRY;
1569
1570         OBD_ALLOC(set, sizeof(*set));
1571         if (set == NULL)
1572                 RETURN(-ENOMEM);
1573         lov_init_set(set);
1574
1575         set->set_obd = obd;
1576         set->set_oi = oinfo;
1577
1578         /* We only get block data from the OBD */
1579         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1580                 struct lov_request *req;
1581
1582                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1583                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1584                         continue;
1585                 }
1586
1587                 OBD_ALLOC(req, sizeof(*req));
1588                 if (req == NULL)
1589                         GOTO(out_set, rc = -ENOMEM);
1590
1591                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1592                 if (req->rq_oi.oi_osfs == NULL) {
1593                         OBD_FREE(req, sizeof(*req));
1594                         GOTO(out_set, rc = -ENOMEM);
1595                 }
1596
1597                 req->rq_idx = i;
1598                 req->rq_oi.oi_cb_up = cb_statfs_update;
1599                 req->rq_rqset = set;
1600
1601                 lov_set_add_req(req, set);
1602         }
1603         if (!set->set_count)
1604                 GOTO(out_set, rc = -EIO);
1605         *reqset = set;
1606         RETURN(rc);
1607 out_set:
1608         lov_fini_statfs_set(set);
1609         RETURN(rc);
1610 }