Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63 }
64
65 static void lov_finish_set(struct lov_request_set *set)
66 {
67         struct list_head *pos, *n;
68         ENTRY;
69
70         LASSERT(set);
71         list_for_each_safe(pos, n, &set->set_list) {
72                 struct lov_request *req = list_entry(pos, struct lov_request,
73                                                      rq_link);
74                 list_del_init(&req->rq_link);
75
76                 if (req->rq_oi.oi_oa)
77                         OBDO_FREE(req->rq_oi.oi_oa);
78                 if (req->rq_oi.oi_md)
79                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
80                 if (req->rq_oi.oi_osfs)
81                         OBD_FREE(req->rq_oi.oi_osfs,
82                                  sizeof(*req->rq_oi.oi_osfs));
83                 OBD_FREE(req, sizeof(*req));
84         }
85
86         if (set->set_pga) {
87                 int len = set->set_oabufs * sizeof(*set->set_pga);
88                 OBD_FREE(set->set_pga, len);
89         }
90         if (set->set_lockh)
91                 lov_llh_put(set->set_lockh);
92
93         OBD_FREE(set, sizeof(*set));
94         EXIT;
95 }
96
97 int lov_finished_set(struct lov_request_set *set)
98 {
99         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
100                set->set_count);
101         return set->set_completes == set->set_count;
102 }
103
104
105 void lov_update_set(struct lov_request_set *set,
106                     struct lov_request *req, int rc)
107 {
108         req->rq_complete = 1;
109         req->rq_rc = rc;
110
111         set->set_completes++;
112         if (rc == 0)
113                 set->set_success++;
114
115         cfs_waitq_signal(&set->set_waitq);
116 }
117
118 int lov_update_common_set(struct lov_request_set *set,
119                           struct lov_request *req, int rc)
120 {
121         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
122         ENTRY;
123
124         lov_update_set(set, req, rc);
125
126         /* grace error on inactive ost */
127         if (rc && !(lov->lov_tgts[req->rq_idx] &&
128                     lov->lov_tgts[req->rq_idx]->ltd_active))
129                 rc = 0;
130
131         /* FIXME in raid1 regime, should return 0 */
132         RETURN(rc);
133 }
134
135 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
136 {
137         list_add_tail(&req->rq_link, &set->set_list);
138         set->set_count++;
139         req->rq_rqset = set;
140 }
141
142 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
143                                struct lov_oinfo *loi, int flags,
144                                struct ost_lvb *lvb, __u32 mode, int rc);
145
146 static int lov_update_enqueue_lov(struct obd_export *exp,
147                                   struct lustre_handle *lov_lockhp,
148                                   struct lov_oinfo *loi, int flags, int idx,
149                                   __u64 oid, int rc)
150 {
151         struct lov_obd *lov = &exp->exp_obd->u.lov;
152
153         if (rc != ELDLM_OK &&
154             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
155                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
156                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
157                         /* -EUSERS used by OST to report file contention */
158                         if (rc != -EINTR && rc != -EUSERS)
159                                 CERROR("enqueue objid "LPX64" subobj "
160                                        LPX64" on OST idx %d: rc %d\n",
161                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
162                 } else
163                         rc = ELDLM_OK;
164         }
165         return rc;
166 }
167
168 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
169 {
170         struct lov_request_set *set = req->rq_rqset;
171         struct lustre_handle *lov_lockhp;
172         struct obd_info *oi = set->set_oi;
173         struct lov_oinfo *loi;
174         ENTRY;
175
176         LASSERT(oi != NULL);
177
178         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
179         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
180
181         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
182          * and that copy can be arbitrarily out of date.
183          *
184          * The LOV API is due for a serious rewriting anyways, and this
185          * can be addressed then. */
186
187         lov_stripe_lock(oi->oi_md);
188         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
189                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
190         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
191                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
192         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
193                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
194         lov_stripe_unlock(oi->oi_md);
195         lov_update_set(set, req, rc);
196         RETURN(rc);
197 }
198
199 /* The callback for osc_enqueue that updates lov info for every OSC request. */
200 static int cb_update_enqueue(void *cookie, int rc)
201 {
202         struct obd_info *oinfo = cookie;
203         struct ldlm_enqueue_info *einfo;
204         struct lov_request *lovreq;
205
206         lovreq = container_of(oinfo, struct lov_request, rq_oi);
207         einfo = lovreq->rq_rqset->set_ei;
208         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
209 }
210
211 static int enqueue_done(struct lov_request_set *set, __u32 mode)
212 {
213         struct lov_request *req;
214         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
215         int rc = 0;
216         ENTRY;
217
218         /* enqueue/match success, just return */
219         if (set->set_completes && set->set_completes == set->set_success)
220                 RETURN(0);
221
222         /* cancel enqueued/matched locks */
223         list_for_each_entry(req, &set->set_list, rq_link) {
224                 struct lustre_handle *lov_lockhp;
225
226                 if (!req->rq_complete || req->rq_rc)
227                         continue;
228
229                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
230                 LASSERT(lov_lockhp);
231                 if (!lustre_handle_is_used(lov_lockhp))
232                         continue;
233
234                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
235                                 req->rq_oi.oi_md, mode, lov_lockhp);
236                 if (rc && lov->lov_tgts[req->rq_idx] &&
237                     lov->lov_tgts[req->rq_idx]->ltd_active)
238                         CERROR("cancelling obdjid "LPX64" on OST "
239                                "idx %d error: rc = %d\n",
240                                req->rq_oi.oi_md->lsm_object_id,
241                                req->rq_idx, rc);
242         }
243         if (set->set_lockh)
244                 lov_llh_put(set->set_lockh);
245         RETURN(rc);
246 }
247
248 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
249                          struct ptlrpc_request_set *rqset)
250 {
251         int ret = 0;
252         ENTRY;
253
254         if (set == NULL)
255                 RETURN(0);
256         LASSERT(set->set_exp);
257         /* Do enqueue_done only for sync requests and if any request
258          * succeeded. */
259         if (!rqset) {
260                 if (rc)
261                         set->set_completes = 0;
262                 ret = enqueue_done(set, mode);
263         } else if (set->set_lockh)
264                 lov_llh_put(set->set_lockh);
265
266         if (atomic_dec_and_test(&set->set_refcount))
267                 lov_finish_set(set);
268
269         RETURN(rc ? rc : ret);
270 }
271
272 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
273                          struct ldlm_enqueue_info *einfo,
274                          struct lov_request_set **reqset)
275 {
276         struct lov_obd *lov = &exp->exp_obd->u.lov;
277         struct lov_request_set *set;
278         int i, rc = 0;
279         ENTRY;
280
281         OBD_ALLOC(set, sizeof(*set));
282         if (set == NULL)
283                 RETURN(-ENOMEM);
284         lov_init_set(set);
285
286         set->set_exp = exp;
287         set->set_oi = oinfo;
288         set->set_ei = einfo;
289         set->set_lockh = lov_llh_new(oinfo->oi_md);
290         if (set->set_lockh == NULL)
291                 GOTO(out_set, rc = -ENOMEM);
292         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
293
294         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
295                 struct lov_oinfo *loi;
296                 struct lov_request *req;
297                 obd_off start, end;
298
299                 loi = oinfo->oi_md->lsm_oinfo[i];
300                 if (!lov_stripe_intersects(oinfo->oi_md, i,
301                                            oinfo->oi_policy.l_extent.start,
302                                            oinfo->oi_policy.l_extent.end,
303                                            &start, &end))
304                         continue;
305
306                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
307                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
308                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
309                         continue;
310                 }
311
312                 OBD_ALLOC(req, sizeof(*req));
313                 if (req == NULL)
314                         GOTO(out_set, rc = -ENOMEM);
315
316                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
317                         sizeof(struct lov_oinfo *) +
318                         sizeof(struct lov_oinfo);
319                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
320                 if (req->rq_oi.oi_md == NULL) {
321                         OBD_FREE(req, sizeof(*req));
322                         GOTO(out_set, rc = -ENOMEM);
323                 }
324                 req->rq_oi.oi_md->lsm_oinfo[0] =
325                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
326                         sizeof(struct lov_oinfo *);
327
328                 /* Set lov request specific parameters. */
329                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
330                 req->rq_oi.oi_cb_up = cb_update_enqueue;
331                 req->rq_oi.oi_flags = oinfo->oi_flags;
332
333                 LASSERT(req->rq_oi.oi_lockh);
334
335                 req->rq_oi.oi_policy.l_extent.gid =
336                         oinfo->oi_policy.l_extent.gid;
337                 req->rq_oi.oi_policy.l_extent.start = start;
338                 req->rq_oi.oi_policy.l_extent.end = end;
339
340                 req->rq_idx = loi->loi_ost_idx;
341                 req->rq_stripe = i;
342
343                 /* XXX LOV STACKING: submd should be from the subobj */
344                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
345                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
346                 req->rq_oi.oi_md->lsm_stripe_count = 0;
347                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
348                         loi->loi_kms_valid;
349                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
351
352                 lov_set_add_req(req, set);
353         }
354         if (!set->set_count)
355                 GOTO(out_set, rc = -EIO);
356         *reqset = set;
357         RETURN(0);
358 out_set:
359         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
360         RETURN(rc);
361 }
362
363 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
364                          int rc)
365 {
366         int ret = rc;
367         ENTRY;
368
369         if (rc > 0)
370                 ret = 0;
371         else if (rc == 0)
372                 ret = 1;
373         lov_update_set(set, req, ret);
374         RETURN(rc);
375 }
376
377 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
378 {
379         int rc = 0;
380         ENTRY;
381
382         if (set == NULL)
383                 RETURN(0);
384         LASSERT(set->set_exp);
385         rc = enqueue_done(set, mode);
386         if ((set->set_count == set->set_success) &&
387             (flags & LDLM_FL_TEST_LOCK))
388                 lov_llh_put(set->set_lockh);
389
390         if (atomic_dec_and_test(&set->set_refcount))
391                 lov_finish_set(set);
392
393         RETURN(rc);
394 }
395
396 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
397                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
398                        __u32 mode, struct lustre_handle *lockh,
399                        struct lov_request_set **reqset)
400 {
401         struct lov_obd *lov = &exp->exp_obd->u.lov;
402         struct lov_request_set *set;
403         int i, rc = 0;
404         ENTRY;
405
406         OBD_ALLOC(set, sizeof(*set));
407         if (set == NULL)
408                 RETURN(-ENOMEM);
409         lov_init_set(set);
410
411         set->set_exp = exp;
412         set->set_oi = oinfo;
413         set->set_oi->oi_md = lsm;
414         set->set_lockh = lov_llh_new(lsm);
415         if (set->set_lockh == NULL)
416                 GOTO(out_set, rc = -ENOMEM);
417         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
418
419         for (i = 0; i < lsm->lsm_stripe_count; i++){
420                 struct lov_oinfo *loi;
421                 struct lov_request *req;
422                 obd_off start, end;
423
424                 loi = lsm->lsm_oinfo[i];
425                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
426                                            policy->l_extent.end, &start, &end))
427                         continue;
428
429                 /* FIXME raid1 should grace this error */
430                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
431                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
432                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
433                         GOTO(out_set, rc = -EIO);
434                 }
435
436                 OBD_ALLOC(req, sizeof(*req));
437                 if (req == NULL)
438                         GOTO(out_set, rc = -ENOMEM);
439
440                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
441                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
442                 if (req->rq_oi.oi_md == NULL) {
443                         OBD_FREE(req, sizeof(*req));
444                         GOTO(out_set, rc = -ENOMEM);
445                 }
446
447                 req->rq_oi.oi_policy.l_extent.start = start;
448                 req->rq_oi.oi_policy.l_extent.end = end;
449                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
450
451                 req->rq_idx = loi->loi_ost_idx;
452                 req->rq_stripe = i;
453
454                 /* XXX LOV STACKING: submd should be from the subobj */
455                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
456                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
457                 req->rq_oi.oi_md->lsm_stripe_count = 0;
458
459                 lov_set_add_req(req, set);
460         }
461         if (!set->set_count)
462                 GOTO(out_set, rc = -EIO);
463         *reqset = set;
464         RETURN(rc);
465 out_set:
466         lov_fini_match_set(set, mode, 0);
467         RETURN(rc);
468 }
469
470 int lov_fini_cancel_set(struct lov_request_set *set)
471 {
472         int rc = 0;
473         ENTRY;
474
475         if (set == NULL)
476                 RETURN(0);
477
478         LASSERT(set->set_exp);
479         if (set->set_lockh)
480                 lov_llh_put(set->set_lockh);
481
482         if (atomic_dec_and_test(&set->set_refcount))
483                 lov_finish_set(set);
484
485         RETURN(rc);
486 }
487
488 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
489                         struct lov_stripe_md *lsm, __u32 mode,
490                         struct lustre_handle *lockh,
491                         struct lov_request_set **reqset)
492 {
493         struct lov_request_set *set;
494         int i, rc = 0;
495         ENTRY;
496
497         OBD_ALLOC(set, sizeof(*set));
498         if (set == NULL)
499                 RETURN(-ENOMEM);
500         lov_init_set(set);
501
502         set->set_exp = exp;
503         set->set_oi = oinfo;
504         set->set_oi->oi_md = lsm;
505         set->set_lockh = lov_handle2llh(lockh);
506         if (set->set_lockh == NULL) {
507                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
508                 GOTO(out_set, rc = -EINVAL);
509         }
510         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
511
512         for (i = 0; i < lsm->lsm_stripe_count; i++){
513                 struct lov_request *req;
514                 struct lustre_handle *lov_lockhp;
515                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
516
517                 lov_lockhp = set->set_lockh->llh_handles + i;
518                 if (!lustre_handle_is_used(lov_lockhp)) {
519                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
520                                loi->loi_ost_idx, loi->loi_id);
521                         continue;
522                 }
523
524                 OBD_ALLOC(req, sizeof(*req));
525                 if (req == NULL)
526                         GOTO(out_set, rc = -ENOMEM);
527
528                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
529                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
530                 if (req->rq_oi.oi_md == NULL) {
531                         OBD_FREE(req, sizeof(*req));
532                         GOTO(out_set, rc = -ENOMEM);
533                 }
534
535                 req->rq_idx = loi->loi_ost_idx;
536                 req->rq_stripe = i;
537
538                 /* XXX LOV STACKING: submd should be from the subobj */
539                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
540                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
541                 req->rq_oi.oi_md->lsm_stripe_count = 0;
542
543                 lov_set_add_req(req, set);
544         }
545         if (!set->set_count)
546                 GOTO(out_set, rc = -EIO);
547         *reqset = set;
548         RETURN(rc);
549 out_set:
550         lov_fini_cancel_set(set);
551         RETURN(rc);
552 }
553
554 static int create_done(struct obd_export *exp, struct lov_request_set *set,
555                        struct lov_stripe_md **lsmp)
556 {
557         struct lov_obd *lov = &exp->exp_obd->u.lov;
558         struct obd_trans_info *oti = set->set_oti;
559         struct obdo *src_oa = set->set_oi->oi_oa;
560         struct lov_request *req;
561         struct obdo *ret_oa = NULL;
562         int attrset = 0, rc = 0;
563         ENTRY;
564
565         LASSERT(set->set_completes);
566
567         /* try alloc objects on other osts if osc_create fails for
568          * exceptions: RPC failure, ENOSPC, etc */
569         if (set->set_count != set->set_success) {
570                 list_for_each_entry (req, &set->set_list, rq_link) {
571                         if (req->rq_rc == 0)
572                                 continue;
573
574                         set->set_completes--;
575                         req->rq_complete = 0;
576
577                         rc = qos_remedy_create(set, req);
578                         lov_update_create_set(set, req, rc);
579                 }
580         }
581
582         /* no successful creates */
583         if (set->set_success == 0)
584                 GOTO(cleanup, rc);
585
586         if (set->set_count != set->set_success) {
587                 set->set_count = set->set_success;
588                 qos_shrink_lsm(set);
589         }
590
591         OBDO_ALLOC(ret_oa);
592         if (ret_oa == NULL)
593                 GOTO(cleanup, rc = -ENOMEM);
594
595         list_for_each_entry(req, &set->set_list, rq_link) {
596                 if (!req->rq_complete || req->rq_rc)
597                         continue;
598                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
599                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
600                                 req->rq_stripe, &attrset);
601         }
602         if (src_oa->o_valid & OBD_MD_FLSIZE &&
603             ret_oa->o_size != src_oa->o_size) {
604                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
605                        src_oa->o_size, ret_oa->o_size);
606                 LBUG();
607         }
608         ret_oa->o_id = src_oa->o_id;
609         ret_oa->o_gr = src_oa->o_gr;
610         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
611         memcpy(src_oa, ret_oa, sizeof(*src_oa));
612         OBDO_FREE(ret_oa);
613
614         *lsmp = set->set_oi->oi_md;
615         GOTO(done, rc = 0);
616
617 cleanup:
618         list_for_each_entry(req, &set->set_list, rq_link) {
619                 struct obd_export *sub_exp;
620                 int err = 0;
621
622                 if (!req->rq_complete || req->rq_rc)
623                         continue;
624
625                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
626                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
627                                   NULL);
628                 if (err)
629                         CERROR("Failed to uncreate objid "LPX64" subobj "
630                                LPX64" on OST idx %d: rc = %d\n",
631                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
632                                req->rq_idx, rc);
633         }
634         if (*lsmp == NULL)
635                 obd_free_memmd(exp, &set->set_oi->oi_md);
636 done:
637         if (oti && set->set_cookies) {
638                 oti->oti_logcookies = set->set_cookies;
639                 if (!set->set_cookie_sent) {
640                         oti_free_cookies(oti);
641                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
642                 } else {
643                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
644                 }
645         }
646         RETURN(rc);
647 }
648
649 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
650 {
651         int rc = 0;
652         ENTRY;
653
654         if (set == NULL)
655                 RETURN(0);
656         LASSERT(set->set_exp);
657         if (set->set_completes)
658                 rc = create_done(set->set_exp, set, lsmp);
659
660         if (atomic_dec_and_test(&set->set_refcount))
661                 lov_finish_set(set);
662
663         RETURN(rc);
664 }
665
666 int lov_update_create_set(struct lov_request_set *set,
667                           struct lov_request *req, int rc)
668 {
669         struct obd_trans_info *oti = set->set_oti;
670         struct lov_stripe_md *lsm = set->set_oi->oi_md;
671         struct lov_oinfo *loi;
672         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
673         ENTRY;
674
675         req->rq_stripe = set->set_success;
676         loi = lsm->lsm_oinfo[req->rq_stripe];
677
678         if (rc && lov->lov_tgts[req->rq_idx] &&
679             lov->lov_tgts[req->rq_idx]->ltd_active) {
680                 CERROR("error creating fid "LPX64" sub-object"
681                        " on OST idx %d/%d: rc = %d\n",
682                        set->set_oi->oi_oa->o_id, req->rq_idx,
683                        lsm->lsm_stripe_count, rc);
684                 if (rc > 0) {
685                         CERROR("obd_create returned invalid err %d\n", rc);
686                         rc = -EIO;
687                 }
688         }
689         lov_update_set(set, req, rc);
690         if (rc)
691                 RETURN(rc);
692
693         loi->loi_id = req->rq_oi.oi_oa->o_id;
694         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
695         loi->loi_ost_idx = req->rq_idx;
696         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
697                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
698         loi_init(loi);
699
700         if (oti && set->set_cookies)
701                 ++oti->oti_logcookies;
702         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
703                 set->set_cookie_sent++;
704
705         RETURN(0);
706 }
707
708 int cb_create_update(void *cookie, int rc)
709 {
710         struct obd_info *oinfo = cookie;
711         struct lov_request *lovreq;
712
713         lovreq = container_of(oinfo, struct lov_request, rq_oi);
714         return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
715 }
716
717
718 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
719                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
720                         struct obd_trans_info *oti,
721                         struct lov_request_set **reqset)
722 {
723         struct lov_request_set *set;
724         int rc = 0;
725         ENTRY;
726
727         OBD_ALLOC(set, sizeof(*set));
728         if (set == NULL)
729                 RETURN(-ENOMEM);
730         lov_init_set(set);
731
732         set->set_exp = exp;
733         set->set_oi = oinfo;
734         set->set_oi->oi_md = *lsmp;
735         set->set_oi->oi_oa = src_oa;
736         set->set_oti = oti;
737
738         rc = qos_prep_create(exp, set);
739         /* qos_shrink_lsm() may have allocated a new lsm */
740         *lsmp = oinfo->oi_md;
741         if (rc)
742                 lov_fini_create_set(set, lsmp);
743         else
744                 *reqset = set;
745         RETURN(rc);
746 }
747
748 static int common_attr_done(struct lov_request_set *set)
749 {
750         struct list_head *pos;
751         struct lov_request *req;
752         struct obdo *tmp_oa;
753         int rc = 0, attrset = 0;
754         ENTRY;
755
756         LASSERT(set->set_oi != NULL);
757
758         if (set->set_oi->oi_oa == NULL)
759                 RETURN(0);
760
761         if (!set->set_success)
762                 RETURN(-EIO);
763
764         OBDO_ALLOC(tmp_oa);
765         if (tmp_oa == NULL)
766                 GOTO(out, rc = -ENOMEM);
767
768         list_for_each (pos, &set->set_list) {
769                 req = list_entry(pos, struct lov_request, rq_link);
770
771                 if (!req->rq_complete || req->rq_rc)
772                         continue;
773                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
774                         continue;
775                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
776                                 req->rq_oi.oi_oa->o_valid,
777                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
778         }
779         if (!attrset) {
780                 CERROR("No stripes had valid attrs\n");
781                 rc = -EIO;
782         }
783         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
784         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
785 out:
786         if (tmp_oa)
787                 OBDO_FREE(tmp_oa);
788         RETURN(rc);
789
790 }
791
792 static int brw_done(struct lov_request_set *set)
793 {
794         struct lov_stripe_md *lsm = set->set_oi->oi_md;
795         struct lov_oinfo     *loi = NULL;
796         struct list_head *pos;
797         struct lov_request *req;
798         ENTRY;
799
800         list_for_each (pos, &set->set_list) {
801                 req = list_entry(pos, struct lov_request, rq_link);
802
803                 if (!req->rq_complete || req->rq_rc)
804                         continue;
805
806                 loi = lsm->lsm_oinfo[req->rq_stripe];
807
808                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
809                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
810         }
811
812         RETURN(0);
813 }
814
815 int lov_fini_brw_set(struct lov_request_set *set)
816 {
817         int rc = 0;
818         ENTRY;
819
820         if (set == NULL)
821                 RETURN(0);
822         LASSERT(set->set_exp);
823         if (set->set_completes) {
824                 rc = brw_done(set);
825                 /* FIXME update qos data here */
826         }
827         if (atomic_dec_and_test(&set->set_refcount))
828                 lov_finish_set(set);
829
830         RETURN(rc);
831 }
832
833 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
834                      obd_count oa_bufs, struct brw_page *pga,
835                      struct obd_trans_info *oti,
836                      struct lov_request_set **reqset)
837 {
838         struct {
839                 obd_count       index;
840                 obd_count       count;
841                 obd_count       off;
842         } *info = NULL;
843         struct lov_request_set *set;
844         struct lov_obd *lov = &exp->exp_obd->u.lov;
845         int rc = 0, i, shift;
846         ENTRY;
847
848         OBD_ALLOC(set, sizeof(*set));
849         if (set == NULL)
850                 RETURN(-ENOMEM);
851         lov_init_set(set);
852
853         set->set_exp = exp;
854         set->set_oti = oti;
855         set->set_oi = oinfo;
856         set->set_oabufs = oa_bufs;
857         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
858         if (!set->set_pga)
859                 GOTO(out, rc = -ENOMEM);
860
861         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
862         if (!info)
863                 GOTO(out, rc = -ENOMEM);
864
865         /* calculate the page count for each stripe */
866         for (i = 0; i < oa_bufs; i++) {
867                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
868                 info[stripe].count++;
869         }
870
871         /* alloc and initialize lov request */
872         shift = 0;
873         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
874                 struct lov_oinfo *loi = NULL;
875                 struct lov_request *req;
876
877                 if (info[i].count == 0)
878                         continue;
879
880                 loi = oinfo->oi_md->lsm_oinfo[i];
881                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
882                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
883                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
884                         GOTO(out, rc = -EIO);
885                 }
886
887                 OBD_ALLOC(req, sizeof(*req));
888                 if (req == NULL)
889                         GOTO(out, rc = -ENOMEM);
890
891                 OBDO_ALLOC(req->rq_oi.oi_oa);
892                 if (req->rq_oi.oi_oa == NULL) {
893                         OBD_FREE(req, sizeof(*req));
894                         GOTO(out, rc = -ENOMEM);
895                 }
896
897                 if (oinfo->oi_oa) {
898                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
899                                sizeof(*req->rq_oi.oi_oa));
900                 }
901                 req->rq_oi.oi_oa->o_id = loi->loi_id;
902                 req->rq_oi.oi_oa->o_stripe_idx = i;
903
904                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
905                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
906                 if (req->rq_oi.oi_md == NULL) {
907                         OBDO_FREE(req->rq_oi.oi_oa);
908                         OBD_FREE(req, sizeof(*req));
909                         GOTO(out, rc = -ENOMEM);
910                 }
911
912                 req->rq_idx = loi->loi_ost_idx;
913                 req->rq_stripe = i;
914
915                 /* XXX LOV STACKING */
916                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
917                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
918                 req->rq_oabufs = info[i].count;
919                 req->rq_pgaidx = shift;
920                 shift += req->rq_oabufs;
921
922                 /* remember the index for sort brw_page array */
923                 info[i].index = req->rq_pgaidx;
924
925                 req->rq_oi.oi_capa = oinfo->oi_capa;
926
927                 lov_set_add_req(req, set);
928         }
929         if (!set->set_count)
930                 GOTO(out, rc = -EIO);
931
932         /* rotate & sort the brw_page array */
933         for (i = 0; i < oa_bufs; i++) {
934                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
935
936                 shift = info[stripe].index + info[stripe].off;
937                 LASSERT(shift < oa_bufs);
938                 set->set_pga[shift] = pga[i];
939                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
940                                   &set->set_pga[shift].off);
941                 info[stripe].off++;
942         }
943 out:
944         if (info)
945                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
946
947         if (rc == 0)
948                 *reqset = set;
949         else
950                 lov_fini_brw_set(set);
951
952         RETURN(rc);
953 }
954
955 int lov_fini_getattr_set(struct lov_request_set *set)
956 {
957         int rc = 0;
958         ENTRY;
959
960         if (set == NULL)
961                 RETURN(0);
962         LASSERT(set->set_exp);
963         if (set->set_completes)
964                 rc = common_attr_done(set);
965
966         if (atomic_dec_and_test(&set->set_refcount))
967                 lov_finish_set(set);
968
969         RETURN(rc);
970 }
971
972 /* The callback for osc_getattr_async that finilizes a request info when a
973  * response is received. */
974 static int cb_getattr_update(void *cookie, int rc)
975 {
976         struct obd_info *oinfo = cookie;
977         struct lov_request *lovreq;
978         lovreq = container_of(oinfo, struct lov_request, rq_oi);
979         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
980 }
981
982 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
983                          struct lov_request_set **reqset)
984 {
985         struct lov_request_set *set;
986         struct lov_obd *lov = &exp->exp_obd->u.lov;
987         int rc = 0, i;
988         ENTRY;
989
990         OBD_ALLOC(set, sizeof(*set));
991         if (set == NULL)
992                 RETURN(-ENOMEM);
993         lov_init_set(set);
994
995         set->set_exp = exp;
996         set->set_oi = oinfo;
997
998         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
999                 struct lov_oinfo *loi;
1000                 struct lov_request *req;
1001
1002                 loi = oinfo->oi_md->lsm_oinfo[i];
1003                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1004                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1005                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1006                         continue;
1007                 }
1008
1009                 OBD_ALLOC(req, sizeof(*req));
1010                 if (req == NULL)
1011                         GOTO(out_set, rc = -ENOMEM);
1012
1013                 req->rq_stripe = i;
1014                 req->rq_idx = loi->loi_ost_idx;
1015
1016                 OBDO_ALLOC(req->rq_oi.oi_oa);
1017                 if (req->rq_oi.oi_oa == NULL) {
1018                         OBD_FREE(req, sizeof(*req));
1019                         GOTO(out_set, rc = -ENOMEM);
1020                 }
1021                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1022                        sizeof(*req->rq_oi.oi_oa));
1023                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1024                 req->rq_oi.oi_cb_up = cb_getattr_update;
1025                 req->rq_oi.oi_capa = oinfo->oi_capa;
1026
1027                 lov_set_add_req(req, set);
1028         }
1029         if (!set->set_count)
1030                 GOTO(out_set, rc = -EIO);
1031         *reqset = set;
1032         RETURN(rc);
1033 out_set:
1034         lov_fini_getattr_set(set);
1035         RETURN(rc);
1036 }
1037
1038 int lov_fini_destroy_set(struct lov_request_set *set)
1039 {
1040         ENTRY;
1041
1042         if (set == NULL)
1043                 RETURN(0);
1044         LASSERT(set->set_exp);
1045         if (set->set_completes) {
1046                 /* FIXME update qos data here */
1047         }
1048
1049         if (atomic_dec_and_test(&set->set_refcount))
1050                 lov_finish_set(set);
1051
1052         RETURN(0);
1053 }
1054
1055 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1056                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1057                          struct obd_trans_info *oti,
1058                          struct lov_request_set **reqset)
1059 {
1060         struct lov_request_set *set;
1061         struct lov_obd *lov = &exp->exp_obd->u.lov;
1062         int rc = 0, i;
1063         ENTRY;
1064
1065         OBD_ALLOC(set, sizeof(*set));
1066         if (set == NULL)
1067                 RETURN(-ENOMEM);
1068         lov_init_set(set);
1069
1070         set->set_exp = exp;
1071         set->set_oi = oinfo;
1072         set->set_oi->oi_md = lsm;
1073         set->set_oi->oi_oa = src_oa;
1074         set->set_oti = oti;
1075         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1076                 set->set_cookies = oti->oti_logcookies;
1077
1078         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1079                 struct lov_oinfo *loi;
1080                 struct lov_request *req;
1081
1082                 loi = lsm->lsm_oinfo[i];
1083                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1084                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1085                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1086                         continue;
1087                 }
1088
1089                 OBD_ALLOC(req, sizeof(*req));
1090                 if (req == NULL)
1091                         GOTO(out_set, rc = -ENOMEM);
1092
1093                 req->rq_stripe = i;
1094                 req->rq_idx = loi->loi_ost_idx;
1095
1096                 OBDO_ALLOC(req->rq_oi.oi_oa);
1097                 if (req->rq_oi.oi_oa == NULL) {
1098                         OBD_FREE(req, sizeof(*req));
1099                         GOTO(out_set, rc = -ENOMEM);
1100                 }
1101                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1102                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1103                 lov_set_add_req(req, set);
1104         }
1105         if (!set->set_count)
1106                 GOTO(out_set, rc = -EIO);
1107         *reqset = set;
1108         RETURN(rc);
1109 out_set:
1110         lov_fini_destroy_set(set);
1111         RETURN(rc);
1112 }
1113
1114 int lov_fini_setattr_set(struct lov_request_set *set)
1115 {
1116         int rc = 0;
1117         ENTRY;
1118
1119         if (set == NULL)
1120                 RETURN(0);
1121         LASSERT(set->set_exp);
1122         if (set->set_completes) {
1123                 rc = common_attr_done(set);
1124                 /* FIXME update qos data here */
1125         }
1126
1127         if (atomic_dec_and_test(&set->set_refcount))
1128                 lov_finish_set(set);
1129         RETURN(rc);
1130 }
1131
1132 int lov_update_setattr_set(struct lov_request_set *set,
1133                            struct lov_request *req, int rc)
1134 {
1135         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1136         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1137         ENTRY;
1138
1139         lov_update_set(set, req, rc);
1140
1141         /* grace error on inactive ost */
1142         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1143                     lov->lov_tgts[req->rq_idx]->ltd_active))
1144                 rc = 0;
1145
1146         if (rc == 0) {
1147                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1148                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1149                                 req->rq_oi.oi_oa->o_ctime;
1150                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1151                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1152                                 req->rq_oi.oi_oa->o_mtime;
1153                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1154                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1155                                 req->rq_oi.oi_oa->o_atime;
1156         }
1157
1158         RETURN(rc);
1159 }
1160
1161 /* The callback for osc_setattr_async that finilizes a request info when a
1162  * response is received. */
1163 static int cb_setattr_update(void *cookie, int rc)
1164 {
1165         struct obd_info *oinfo = cookie;
1166         struct lov_request *lovreq;
1167         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1168         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1169 }
1170
1171 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1172                          struct obd_trans_info *oti,
1173                          struct lov_request_set **reqset)
1174 {
1175         struct lov_request_set *set;
1176         struct lov_obd *lov = &exp->exp_obd->u.lov;
1177         int rc = 0, i;
1178         ENTRY;
1179
1180         OBD_ALLOC(set, sizeof(*set));
1181         if (set == NULL)
1182                 RETURN(-ENOMEM);
1183         lov_init_set(set);
1184
1185         set->set_exp = exp;
1186         set->set_oti = oti;
1187         set->set_oi = oinfo;
1188         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1189                 set->set_cookies = oti->oti_logcookies;
1190
1191         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1192                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1193                 struct lov_request *req;
1194
1195                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1196                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1197                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1198                         continue;
1199                 }
1200
1201                 OBD_ALLOC(req, sizeof(*req));
1202                 if (req == NULL)
1203                         GOTO(out_set, rc = -ENOMEM);
1204                 req->rq_stripe = i;
1205                 req->rq_idx = loi->loi_ost_idx;
1206
1207                 OBDO_ALLOC(req->rq_oi.oi_oa);
1208                 if (req->rq_oi.oi_oa == NULL) {
1209                         OBD_FREE(req, sizeof(*req));
1210                         GOTO(out_set, rc = -ENOMEM);
1211                 }
1212                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1213                        sizeof(*req->rq_oi.oi_oa));
1214                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1215                 LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
1216                          CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
1217                          "req->rq_oi.oi_oa->o_valid="LPX64" "
1218                          "req->rq_oi.oi_oa->o_gr="LPU64"\n",
1219                          req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
1220                 req->rq_oi.oi_oa->o_stripe_idx = i;
1221                 req->rq_oi.oi_cb_up = cb_setattr_update;
1222                 req->rq_oi.oi_capa = oinfo->oi_capa;
1223
1224                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1225                         int off = lov_stripe_offset(oinfo->oi_md,
1226                                                     oinfo->oi_oa->o_size, i,
1227                                                     &req->rq_oi.oi_oa->o_size);
1228
1229                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1230                                 req->rq_oi.oi_oa->o_size--;
1231
1232                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1233                                i, req->rq_oi.oi_oa->o_size,
1234                                oinfo->oi_oa->o_size);
1235                 }
1236                 lov_set_add_req(req, set);
1237         }
1238         if (!set->set_count)
1239                 GOTO(out_set, rc = -EIO);
1240         *reqset = set;
1241         RETURN(rc);
1242 out_set:
1243         lov_fini_setattr_set(set);
1244         RETURN(rc);
1245 }
1246
1247 int lov_fini_punch_set(struct lov_request_set *set)
1248 {
1249         int rc = 0;
1250         ENTRY;
1251
1252         if (set == NULL)
1253                 RETURN(0);
1254         LASSERT(set->set_exp);
1255         if (set->set_completes) {
1256                 rc = -EIO;
1257                 /* FIXME update qos data here */
1258                 if (set->set_success)
1259                         rc = common_attr_done(set);
1260         }
1261
1262         if (atomic_dec_and_test(&set->set_refcount))
1263                 lov_finish_set(set);
1264
1265         RETURN(rc);
1266 }
1267
1268 int lov_update_punch_set(struct lov_request_set *set,
1269                          struct lov_request *req, int rc)
1270 {
1271         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1272         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1273         ENTRY;
1274
1275         lov_update_set(set, req, rc);
1276
1277         /* grace error on inactive ost */
1278         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1279                 rc = 0;
1280
1281         if (rc == 0) {
1282                 lov_stripe_lock(lsm);
1283                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1284                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1285                                 req->rq_oi.oi_oa->o_blocks;
1286                 }
1287
1288                 /* Do we need to update lvb_size here? It needn't because
1289                  * it have been done in ll_truncate(). -jay */
1290                 lov_stripe_unlock(lsm);
1291         }
1292
1293         RETURN(rc);
1294 }
1295
1296 /* The callback for osc_punch that finilizes a request info when a response
1297  * is received. */
1298 static int cb_update_punch(void *cookie, int rc)
1299 {
1300         struct obd_info *oinfo = cookie;
1301         struct lov_request *lovreq;
1302         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1303         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1304 }
1305
1306 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1307                        struct obd_trans_info *oti,
1308                        struct lov_request_set **reqset)
1309 {
1310         struct lov_request_set *set;
1311         struct lov_obd *lov = &exp->exp_obd->u.lov;
1312         int rc = 0, i;
1313         ENTRY;
1314
1315         OBD_ALLOC(set, sizeof(*set));
1316         if (set == NULL)
1317                 RETURN(-ENOMEM);
1318         lov_init_set(set);
1319
1320         set->set_oi = oinfo;
1321         set->set_exp = exp;
1322
1323         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1324                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1325                 struct lov_request *req;
1326                 obd_off rs, re;
1327
1328                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1329                                            oinfo->oi_policy.l_extent.start,
1330                                            oinfo->oi_policy.l_extent.end,
1331                                            &rs, &re))
1332                         continue;
1333
1334                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1335                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1336                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1337                         GOTO(out_set, rc = -EIO);
1338                 }
1339
1340                 OBD_ALLOC(req, sizeof(*req));
1341                 if (req == NULL)
1342                         GOTO(out_set, rc = -ENOMEM);
1343                 req->rq_stripe = i;
1344                 req->rq_idx = loi->loi_ost_idx;
1345
1346                 OBDO_ALLOC(req->rq_oi.oi_oa);
1347                 if (req->rq_oi.oi_oa == NULL) {
1348                         OBD_FREE(req, sizeof(*req));
1349                         GOTO(out_set, rc = -ENOMEM);
1350                 }
1351                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1352                        sizeof(*req->rq_oi.oi_oa));
1353                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1354                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1355                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1356
1357                 req->rq_oi.oi_oa->o_stripe_idx = i;
1358                 req->rq_oi.oi_cb_up = cb_update_punch;
1359
1360                 req->rq_oi.oi_policy.l_extent.start = rs;
1361                 req->rq_oi.oi_policy.l_extent.end = re;
1362                 req->rq_oi.oi_policy.l_extent.gid = -1;
1363
1364                 req->rq_oi.oi_capa = oinfo->oi_capa;
1365
1366                 lov_set_add_req(req, set);
1367         }
1368         if (!set->set_count)
1369                 GOTO(out_set, rc = -EIO);
1370         *reqset = set;
1371         RETURN(rc);
1372 out_set:
1373         lov_fini_punch_set(set);
1374         RETURN(rc);
1375 }
1376
1377 int lov_fini_sync_set(struct lov_request_set *set)
1378 {
1379         int rc = 0;
1380         ENTRY;
1381
1382         if (set == NULL)
1383                 RETURN(0);
1384         LASSERT(set->set_exp);
1385         if (set->set_completes) {
1386                 if (!set->set_success)
1387                         rc = -EIO;
1388                 /* FIXME update qos data here */
1389         }
1390
1391         if (atomic_dec_and_test(&set->set_refcount))
1392                 lov_finish_set(set);
1393
1394         RETURN(rc);
1395 }
1396
1397 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1398                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1399                       obd_off start, obd_off end,
1400                       struct lov_request_set **reqset)
1401 {
1402         struct lov_request_set *set;
1403         struct lov_obd *lov = &exp->exp_obd->u.lov;
1404         int rc = 0, i;
1405         ENTRY;
1406
1407         OBD_ALLOC(set, sizeof(*set));
1408         if (set == NULL)
1409                 RETURN(-ENOMEM);
1410         lov_init_set(set);
1411
1412         set->set_exp = exp;
1413         set->set_oi = oinfo;
1414         set->set_oi->oi_md = lsm;
1415         set->set_oi->oi_oa = src_oa;
1416
1417         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1418                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1419                 struct lov_request *req;
1420                 obd_off rs, re;
1421
1422                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1423                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1424                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1425                         continue;
1426                 }
1427
1428                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1429                         continue;
1430
1431                 OBD_ALLOC(req, sizeof(*req));
1432                 if (req == NULL)
1433                         GOTO(out_set, rc = -ENOMEM);
1434                 req->rq_stripe = i;
1435                 req->rq_idx = loi->loi_ost_idx;
1436
1437                 OBDO_ALLOC(req->rq_oi.oi_oa);
1438                 if (req->rq_oi.oi_oa == NULL) {
1439                         OBD_FREE(req, sizeof(*req));
1440                         GOTO(out_set, rc = -ENOMEM);
1441                 }
1442                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1443                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1444                 req->rq_oi.oi_oa->o_stripe_idx = i;
1445
1446                 req->rq_oi.oi_policy.l_extent.start = rs;
1447                 req->rq_oi.oi_policy.l_extent.end = re;
1448                 req->rq_oi.oi_policy.l_extent.gid = -1;
1449
1450                 lov_set_add_req(req, set);
1451         }
1452         if (!set->set_count)
1453                 GOTO(out_set, rc = -EIO);
1454         *reqset = set;
1455         RETURN(rc);
1456 out_set:
1457         lov_fini_sync_set(set);
1458         RETURN(rc);
1459 }
1460
1461 #define LOV_U64_MAX ((__u64)~0ULL)
1462 #define LOV_SUM_MAX(tot, add)                                           \
1463         do {                                                            \
1464                 if ((tot) + (add) < (tot))                              \
1465                         (tot) = LOV_U64_MAX;                            \
1466                 else                                                    \
1467                         (tot) += (add);                                 \
1468         } while(0)
1469
1470 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1471 {
1472         ENTRY;
1473
1474         if (success) {
1475                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1476
1477                 if (osfs->os_files != LOV_U64_MAX)
1478                         do_div(osfs->os_files, expected_stripes);
1479                 if (osfs->os_ffree != LOV_U64_MAX)
1480                         do_div(osfs->os_ffree, expected_stripes);
1481
1482                 spin_lock(&obd->obd_osfs_lock);
1483                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1484                 obd->obd_osfs_age = cfs_time_current_64();
1485                 spin_unlock(&obd->obd_osfs_lock);
1486                 RETURN(0);
1487         }
1488
1489         RETURN(-EIO);
1490 }
1491
1492 int lov_fini_statfs_set(struct lov_request_set *set)
1493 {
1494         int rc = 0;
1495         ENTRY;
1496
1497         if (set == NULL)
1498                 RETURN(0);
1499
1500         if (set->set_completes) {
1501                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1502                                      set->set_success);
1503         }
1504
1505         if (atomic_dec_and_test(&set->set_refcount))
1506                 lov_finish_set(set);
1507
1508         RETURN(rc);
1509 }
1510
1511 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1512                        int success)
1513 {
1514         int shift = 0, quit = 0;
1515         __u64 tmp;
1516
1517         if (success == 0) {
1518                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1519         } else {
1520                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1521                         /* assume all block sizes are always powers of 2 */
1522                         /* get the bits difference */
1523                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1524                         for (shift = 0; shift <= 64; ++shift) {
1525                                 if (tmp & 1) {
1526                                         if (quit)
1527                                                 break;
1528                                         else
1529                                                 quit = 1;
1530                                         shift = 0;
1531                                 }
1532                                 tmp >>= 1;
1533                         }
1534                 }
1535
1536                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1537                         osfs->os_bsize = lov_sfs->os_bsize;
1538
1539                         osfs->os_bfree  >>= shift;
1540                         osfs->os_bavail >>= shift;
1541                         osfs->os_blocks >>= shift;
1542                 } else if (shift != 0) {
1543                         lov_sfs->os_bfree  >>= shift;
1544                         lov_sfs->os_bavail >>= shift;
1545                         lov_sfs->os_blocks >>= shift;
1546                 }
1547 #ifdef MIN_DF
1548                 /* Sandia requested that df (and so, statfs) only
1549                    returned minimal available space on
1550                    a single OST, so people would be able to
1551                    write this much data guaranteed. */
1552                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1553                         /* Presumably if new bavail is smaller,
1554                            new bfree is bigger as well */
1555                         osfs->os_bfree = lov_sfs->os_bfree;
1556                         osfs->os_bavail = lov_sfs->os_bavail;
1557                 }
1558 #else
1559                 osfs->os_bfree += lov_sfs->os_bfree;
1560                 osfs->os_bavail += lov_sfs->os_bavail;
1561 #endif
1562                 osfs->os_blocks += lov_sfs->os_blocks;
1563                 /* XXX not sure about this one - depends on policy.
1564                  *   - could be minimum if we always stripe on all OBDs
1565                  *     (but that would be wrong for any other policy,
1566                  *     if one of the OBDs has no more objects left)
1567                  *   - could be sum if we stripe whole objects
1568                  *   - could be average, just to give a nice number
1569                  *
1570                  * To give a "reasonable" (if not wholly accurate)
1571                  * number, we divide the total number of free objects
1572                  * by expected stripe count (watch out for overflow).
1573                  */
1574                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1575                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1576         }
1577 }
1578
1579 /* The callback for osc_statfs_async that finilizes a request info when a
1580  * response is received. */
1581 static int cb_statfs_update(void *cookie, int rc)
1582 {
1583         struct obd_info *oinfo = cookie;
1584         struct lov_request *lovreq;
1585         struct obd_statfs *osfs, *lov_sfs;
1586         struct obd_device *obd;
1587         struct lov_obd *lov;
1588         int success;
1589         ENTRY;
1590
1591         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1592         lov = &lovreq->rq_rqset->set_obd->u.lov;
1593         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1594
1595         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1596         lov_sfs = oinfo->oi_osfs;
1597
1598         success = lovreq->rq_rqset->set_success;
1599         /* XXX: the same is done in lov_update_common_set, however
1600            lovset->set_exp is not initialized. */
1601         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1602         if (rc) {
1603                 /* XXX ignore error for disconnected ost ? */
1604                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1605                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1606                         rc = 0;
1607                 GOTO(out, rc);
1608         }
1609
1610         spin_lock(&obd->obd_osfs_lock);
1611         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1612         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1613                 obd->obd_osfs_age = cfs_time_current_64();
1614         spin_unlock(&obd->obd_osfs_lock);
1615
1616         lov_update_statfs(osfs, lov_sfs, success);
1617         qos_update(lov);
1618 out:
1619         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1620             lov_finished_set(lovreq->rq_rqset)) {
1621                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1622                                     lovreq->rq_rqset->set_success !=
1623                                                   lovreq->rq_rqset->set_count);
1624                qos_statfs_done(lov);
1625         }
1626
1627         RETURN(0);
1628 }
1629
1630 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1631                         struct lov_request_set **reqset)
1632 {
1633         struct lov_request_set *set;
1634         struct lov_obd *lov = &obd->u.lov;
1635         int rc = 0, i;
1636         ENTRY;
1637
1638         OBD_ALLOC(set, sizeof(*set));
1639         if (set == NULL)
1640                 RETURN(-ENOMEM);
1641         lov_init_set(set);
1642
1643         set->set_obd = obd;
1644         set->set_oi = oinfo;
1645
1646         /* We only get block data from the OBD */
1647         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1648                 struct lov_request *req;
1649
1650                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1651                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1652                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1653                         continue;
1654                 }
1655
1656                 OBD_ALLOC(req, sizeof(*req));
1657                 if (req == NULL)
1658                         GOTO(out_set, rc = -ENOMEM);
1659
1660                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1661                 if (req->rq_oi.oi_osfs == NULL) {
1662                         OBD_FREE(req, sizeof(*req));
1663                         GOTO(out_set, rc = -ENOMEM);
1664                 }
1665
1666                 req->rq_idx = i;
1667                 req->rq_oi.oi_cb_up = cb_statfs_update;
1668                 req->rq_oi.oi_flags = oinfo->oi_flags;
1669
1670                 lov_set_add_req(req, set);
1671         }
1672         if (!set->set_count)
1673                 GOTO(out_set, rc = -EIO);
1674         *reqset = set;
1675         RETURN(rc);
1676 out_set:
1677         lov_fini_statfs_set(set);
1678         RETURN(rc);
1679 }