Whamcloud - gitweb
672eca2832030b6ffc5d66f455a453eab0790d3e
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #ifdef __KERNEL__
40 #include <libcfs/libcfs.h>
41 #else
42 #include <liblustre.h>
43 #endif
44
45 #include <obd_class.h>
46 #include <obd_lov.h>
47 #include <lustre/lustre_idl.h>
48
49 #include "lov_internal.h"
50
51 static void lov_init_set(struct lov_request_set *set)
52 {
53         set->set_count = 0;
54         cfs_atomic_set(&set->set_completes, 0);
55         cfs_atomic_set(&set->set_success, 0);
56         set->set_cookies = 0;
57         CFS_INIT_LIST_HEAD(&set->set_list);
58         cfs_atomic_set(&set->set_refcount, 1);
59         cfs_waitq_init(&set->set_waitq);
60         cfs_spin_lock_init(&set->set_lock);
61 }
62
63 void lov_finish_set(struct lov_request_set *set)
64 {
65         cfs_list_t *pos, *n;
66         ENTRY;
67
68         LASSERT(set);
69         cfs_list_for_each_safe(pos, n, &set->set_list) {
70                 struct lov_request *req = cfs_list_entry(pos,
71                                                          struct lov_request,
72                                                          rq_link);
73                 cfs_list_del_init(&req->rq_link);
74
75                 if (req->rq_oi.oi_oa)
76                         OBDO_FREE(req->rq_oi.oi_oa);
77                 if (req->rq_oi.oi_md)
78                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
79                 if (req->rq_oi.oi_osfs)
80                         OBD_FREE(req->rq_oi.oi_osfs,
81                                  sizeof(*req->rq_oi.oi_osfs));
82                 OBD_FREE(req, sizeof(*req));
83         }
84
85         if (set->set_pga) {
86                 int len = set->set_oabufs * sizeof(*set->set_pga);
87                 OBD_FREE_LARGE(set->set_pga, len);
88         }
89         if (set->set_lockh)
90                 lov_llh_put(set->set_lockh);
91
92         OBD_FREE(set, sizeof(*set));
93         EXIT;
94 }
95
96 int lov_finished_set(struct lov_request_set *set)
97 {
98         int completes = cfs_atomic_read(&set->set_completes);
99
100         CDEBUG(D_INFO, "check set %d/%d\n", completes,
101                set->set_count);
102         return completes == set->set_count;
103 }
104
105 void lov_update_set(struct lov_request_set *set,
106                     struct lov_request *req, int rc)
107 {
108         req->rq_complete = 1;
109         req->rq_rc = rc;
110
111         cfs_atomic_inc(&set->set_completes);
112         if (rc == 0)
113                 cfs_atomic_inc(&set->set_success);
114
115         cfs_waitq_signal(&set->set_waitq);
116 }
117
118 int lov_update_common_set(struct lov_request_set *set,
119                           struct lov_request *req, int rc)
120 {
121         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
122         ENTRY;
123
124         lov_update_set(set, req, rc);
125
126         /* grace error on inactive ost */
127         if (rc && !(lov->lov_tgts[req->rq_idx] &&
128                     lov->lov_tgts[req->rq_idx]->ltd_active))
129                 rc = 0;
130
131         /* FIXME in raid1 regime, should return 0 */
132         RETURN(rc);
133 }
134
135 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
136 {
137         cfs_list_add_tail(&req->rq_link, &set->set_list);
138         set->set_count++;
139         req->rq_rqset = set;
140 }
141
142 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
143                                struct lov_oinfo *loi, int flags,
144                                struct ost_lvb *lvb, __u32 mode, int rc);
145
146 static int lov_update_enqueue_lov(struct obd_export *exp,
147                                   struct lustre_handle *lov_lockhp,
148                                   struct lov_oinfo *loi, int flags, int idx,
149                                   __u64 oid, int rc)
150 {
151         struct lov_obd *lov = &exp->exp_obd->u.lov;
152
153         if (rc != ELDLM_OK &&
154             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
155                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
156                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
157                         /* -EUSERS used by OST to report file contention */
158                         if (rc != -EINTR && rc != -EUSERS)
159                                 CERROR("enqueue objid "LPX64" subobj "
160                                        LPX64" on OST idx %d: rc %d\n",
161                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
162                 } else
163                         rc = ELDLM_OK;
164         }
165         return rc;
166 }
167
168 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
169 {
170         struct lov_request_set *set = req->rq_rqset;
171         struct lustre_handle *lov_lockhp;
172         struct obd_info *oi = set->set_oi;
173         struct lov_oinfo *loi;
174         ENTRY;
175
176         LASSERT(oi != NULL);
177
178         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
179         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
180
181         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
182          * and that copy can be arbitrarily out of date.
183          *
184          * The LOV API is due for a serious rewriting anyways, and this
185          * can be addressed then. */
186
187         lov_stripe_lock(oi->oi_md);
188         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
189                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
190         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
191                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
192         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
193                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
194         lov_stripe_unlock(oi->oi_md);
195         lov_update_set(set, req, rc);
196         RETURN(rc);
197 }
198
199 /* The callback for osc_enqueue that updates lov info for every OSC request. */
200 static int cb_update_enqueue(void *cookie, int rc)
201 {
202         struct obd_info *oinfo = cookie;
203         struct ldlm_enqueue_info *einfo;
204         struct lov_request *lovreq;
205
206         lovreq = container_of(oinfo, struct lov_request, rq_oi);
207         einfo = lovreq->rq_rqset->set_ei;
208         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
209 }
210
211 static int enqueue_done(struct lov_request_set *set, __u32 mode)
212 {
213         struct lov_request *req;
214         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
215         int completes = cfs_atomic_read(&set->set_completes);
216         int rc = 0;
217         ENTRY;
218
219         /* enqueue/match success, just return */
220         if (completes && completes == cfs_atomic_read(&set->set_success))
221                 RETURN(0);
222
223         /* cancel enqueued/matched locks */
224         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
225                 struct lustre_handle *lov_lockhp;
226
227                 if (!req->rq_complete || req->rq_rc)
228                         continue;
229
230                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
231                 LASSERT(lov_lockhp);
232                 if (!lustre_handle_is_used(lov_lockhp))
233                         continue;
234
235                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
236                                 req->rq_oi.oi_md, mode, lov_lockhp);
237                 if (rc && lov->lov_tgts[req->rq_idx] &&
238                     lov->lov_tgts[req->rq_idx]->ltd_active)
239                         CERROR("cancelling obdjid "LPX64" on OST "
240                                "idx %d error: rc = %d\n",
241                                req->rq_oi.oi_md->lsm_object_id,
242                                req->rq_idx, rc);
243         }
244         if (set->set_lockh)
245                 lov_llh_put(set->set_lockh);
246         RETURN(rc);
247 }
248
249 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
250                          struct ptlrpc_request_set *rqset)
251 {
252         int ret = 0;
253         ENTRY;
254
255         if (set == NULL)
256                 RETURN(0);
257         LASSERT(set->set_exp);
258         /* Do enqueue_done only for sync requests and if any request
259          * succeeded. */
260         if (!rqset) {
261                 if (rc)
262                         cfs_atomic_set(&set->set_completes, 0);
263                 ret = enqueue_done(set, mode);
264         } else if (set->set_lockh)
265                 lov_llh_put(set->set_lockh);
266
267         lov_put_reqset(set);
268
269         RETURN(rc ? rc : ret);
270 }
271
272 static void lov_llh_addref(void *llhp)
273 {
274         struct lov_lock_handles *llh = llhp;
275
276         cfs_atomic_inc(&llh->llh_refcount);
277         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
278                cfs_atomic_read(&llh->llh_refcount));
279 }
280
281 static struct portals_handle_ops lov_handle_ops = {
282         .hop_addref = lov_llh_addref,
283         .hop_free   = NULL,
284 };
285
286 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
287 {
288         struct lov_lock_handles *llh;
289
290         OBD_ALLOC(llh, sizeof *llh +
291                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
292         if (llh == NULL)
293                 return NULL;
294
295         cfs_atomic_set(&llh->llh_refcount, 2);
296         llh->llh_stripe_count = lsm->lsm_stripe_count;
297         CFS_INIT_LIST_HEAD(&llh->llh_handle.h_link);
298         class_handle_hash(&llh->llh_handle, &lov_handle_ops);
299
300         return llh;
301 }
302
303 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
304                          struct ldlm_enqueue_info *einfo,
305                          struct lov_request_set **reqset)
306 {
307         struct lov_obd *lov = &exp->exp_obd->u.lov;
308         struct lov_request_set *set;
309         int i, rc = 0;
310         ENTRY;
311
312         OBD_ALLOC(set, sizeof(*set));
313         if (set == NULL)
314                 RETURN(-ENOMEM);
315         lov_init_set(set);
316
317         set->set_exp = exp;
318         set->set_oi = oinfo;
319         set->set_ei = einfo;
320         set->set_lockh = lov_llh_new(oinfo->oi_md);
321         if (set->set_lockh == NULL)
322                 GOTO(out_set, rc = -ENOMEM);
323         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
324
325         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
326                 struct lov_oinfo *loi;
327                 struct lov_request *req;
328                 obd_off start, end;
329
330                 loi = oinfo->oi_md->lsm_oinfo[i];
331                 if (!lov_stripe_intersects(oinfo->oi_md, i,
332                                            oinfo->oi_policy.l_extent.start,
333                                            oinfo->oi_policy.l_extent.end,
334                                            &start, &end))
335                         continue;
336
337                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
338                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
339                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
340                         continue;
341                 }
342
343                 OBD_ALLOC(req, sizeof(*req));
344                 if (req == NULL)
345                         GOTO(out_set, rc = -ENOMEM);
346
347                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
348                         sizeof(struct lov_oinfo *) +
349                         sizeof(struct lov_oinfo);
350                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
351                 if (req->rq_oi.oi_md == NULL) {
352                         OBD_FREE(req, sizeof(*req));
353                         GOTO(out_set, rc = -ENOMEM);
354                 }
355                 req->rq_oi.oi_md->lsm_oinfo[0] =
356                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
357                         sizeof(struct lov_oinfo *);
358
359                 /* Set lov request specific parameters. */
360                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
361                 req->rq_oi.oi_cb_up = cb_update_enqueue;
362                 req->rq_oi.oi_flags = oinfo->oi_flags;
363
364                 LASSERT(req->rq_oi.oi_lockh);
365
366                 req->rq_oi.oi_policy.l_extent.gid =
367                         oinfo->oi_policy.l_extent.gid;
368                 req->rq_oi.oi_policy.l_extent.start = start;
369                 req->rq_oi.oi_policy.l_extent.end = end;
370
371                 req->rq_idx = loi->loi_ost_idx;
372                 req->rq_stripe = i;
373
374                 /* XXX LOV STACKING: submd should be from the subobj */
375                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
376                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
377                 req->rq_oi.oi_md->lsm_stripe_count = 0;
378                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
379                         loi->loi_kms_valid;
380                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
381                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
382
383                 lov_set_add_req(req, set);
384         }
385         if (!set->set_count)
386                 GOTO(out_set, rc = -EIO);
387         *reqset = set;
388         RETURN(0);
389 out_set:
390         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
391         RETURN(rc);
392 }
393
394 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
395 {
396         int rc = 0;
397         ENTRY;
398
399         if (set == NULL)
400                 RETURN(0);
401         LASSERT(set->set_exp);
402         rc = enqueue_done(set, mode);
403         if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
404             (flags & LDLM_FL_TEST_LOCK))
405                 lov_llh_put(set->set_lockh);
406
407         lov_put_reqset(set);
408
409         RETURN(rc);
410 }
411
412 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
413                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
414                        __u32 mode, struct lustre_handle *lockh,
415                        struct lov_request_set **reqset)
416 {
417         struct lov_obd *lov = &exp->exp_obd->u.lov;
418         struct lov_request_set *set;
419         int i, rc = 0;
420         ENTRY;
421
422         OBD_ALLOC(set, sizeof(*set));
423         if (set == NULL)
424                 RETURN(-ENOMEM);
425         lov_init_set(set);
426
427         set->set_exp = exp;
428         set->set_oi = oinfo;
429         set->set_oi->oi_md = lsm;
430         set->set_lockh = lov_llh_new(lsm);
431         if (set->set_lockh == NULL)
432                 GOTO(out_set, rc = -ENOMEM);
433         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
434
435         for (i = 0; i < lsm->lsm_stripe_count; i++){
436                 struct lov_oinfo *loi;
437                 struct lov_request *req;
438                 obd_off start, end;
439
440                 loi = lsm->lsm_oinfo[i];
441                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
442                                            policy->l_extent.end, &start, &end))
443                         continue;
444
445                 /* FIXME raid1 should grace this error */
446                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
447                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
448                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
449                         GOTO(out_set, rc = -EIO);
450                 }
451
452                 OBD_ALLOC(req, sizeof(*req));
453                 if (req == NULL)
454                         GOTO(out_set, rc = -ENOMEM);
455
456                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
457                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
458                 if (req->rq_oi.oi_md == NULL) {
459                         OBD_FREE(req, sizeof(*req));
460                         GOTO(out_set, rc = -ENOMEM);
461                 }
462
463                 req->rq_oi.oi_policy.l_extent.start = start;
464                 req->rq_oi.oi_policy.l_extent.end = end;
465                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
466
467                 req->rq_idx = loi->loi_ost_idx;
468                 req->rq_stripe = i;
469
470                 /* XXX LOV STACKING: submd should be from the subobj */
471                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
472                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
473                 req->rq_oi.oi_md->lsm_stripe_count = 0;
474
475                 lov_set_add_req(req, set);
476         }
477         if (!set->set_count)
478                 GOTO(out_set, rc = -EIO);
479         *reqset = set;
480         RETURN(rc);
481 out_set:
482         lov_fini_match_set(set, mode, 0);
483         RETURN(rc);
484 }
485
486 int lov_fini_cancel_set(struct lov_request_set *set)
487 {
488         int rc = 0;
489         ENTRY;
490
491         if (set == NULL)
492                 RETURN(0);
493
494         LASSERT(set->set_exp);
495         if (set->set_lockh)
496                 lov_llh_put(set->set_lockh);
497
498         lov_put_reqset(set);
499
500         RETURN(rc);
501 }
502
503 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
504                         struct lov_stripe_md *lsm, __u32 mode,
505                         struct lustre_handle *lockh,
506                         struct lov_request_set **reqset)
507 {
508         struct lov_request_set *set;
509         int i, rc = 0;
510         ENTRY;
511
512         OBD_ALLOC(set, sizeof(*set));
513         if (set == NULL)
514                 RETURN(-ENOMEM);
515         lov_init_set(set);
516
517         set->set_exp = exp;
518         set->set_oi = oinfo;
519         set->set_oi->oi_md = lsm;
520         set->set_lockh = lov_handle2llh(lockh);
521         if (set->set_lockh == NULL) {
522                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
523                 GOTO(out_set, rc = -EINVAL);
524         }
525         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
526
527         for (i = 0; i < lsm->lsm_stripe_count; i++){
528                 struct lov_request *req;
529                 struct lustre_handle *lov_lockhp;
530                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
531
532                 lov_lockhp = set->set_lockh->llh_handles + i;
533                 if (!lustre_handle_is_used(lov_lockhp)) {
534                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
535                                loi->loi_ost_idx, loi->loi_id);
536                         continue;
537                 }
538
539                 OBD_ALLOC(req, sizeof(*req));
540                 if (req == NULL)
541                         GOTO(out_set, rc = -ENOMEM);
542
543                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
544                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
545                 if (req->rq_oi.oi_md == NULL) {
546                         OBD_FREE(req, sizeof(*req));
547                         GOTO(out_set, rc = -ENOMEM);
548                 }
549
550                 req->rq_idx = loi->loi_ost_idx;
551                 req->rq_stripe = i;
552
553                 /* XXX LOV STACKING: submd should be from the subobj */
554                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
555                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
556                 req->rq_oi.oi_md->lsm_stripe_count = 0;
557
558                 lov_set_add_req(req, set);
559         }
560         if (!set->set_count)
561                 GOTO(out_set, rc = -EIO);
562         *reqset = set;
563         RETURN(rc);
564 out_set:
565         lov_fini_cancel_set(set);
566         RETURN(rc);
567 }
568
569 static int lov_update_create_set(struct lov_request_set *set,
570                                  struct lov_request *req, int rc)
571 {
572         struct obd_trans_info *oti = set->set_oti;
573         struct lov_stripe_md *lsm = set->set_oi->oi_md;
574         struct lov_oinfo *loi;
575         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
576         ENTRY;
577
578         if (rc && lov->lov_tgts[req->rq_idx] &&
579             lov->lov_tgts[req->rq_idx]->ltd_active) {
580                 /* Pre-creating objects may timeout via -ETIMEDOUT or
581                  * -ENOTCONN both are always non-critical events. */
582                 CDEBUG(rc == -ETIMEDOUT || rc == -ENOTCONN ? D_HA : D_ERROR,
583                        "error creating fid "LPX64" sub-object "
584                        "on OST idx %d/%d: rc = %d\n",
585                        set->set_oi->oi_oa->o_id, req->rq_idx,
586                        lsm->lsm_stripe_count, rc);
587                 if (rc > 0) {
588                         CERROR("obd_create returned invalid err %d\n", rc);
589                         rc = -EIO;
590                 }
591         }
592
593         cfs_spin_lock(&set->set_lock);
594         req->rq_stripe = cfs_atomic_read(&set->set_success);
595         loi = lsm->lsm_oinfo[req->rq_stripe];
596
597
598         if (rc) {
599                 lov_update_set(set, req, rc);
600                 cfs_spin_unlock(&set->set_lock);
601                 RETURN(rc);
602         }
603
604         loi->loi_id = req->rq_oi.oi_oa->o_id;
605         loi->loi_seq = req->rq_oi.oi_oa->o_seq;
606         loi->loi_ost_idx = req->rq_idx;
607         loi_init(loi);
608
609         if (oti && set->set_cookies)
610                 ++oti->oti_logcookies;
611         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
612                 set->set_cookie_sent++;
613
614         lov_update_set(set, req, rc);
615         cfs_spin_unlock(&set->set_lock);
616
617         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
618                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
619         RETURN(rc);
620 }
621
622 static int create_done(struct obd_export *exp, struct lov_request_set *set,
623                        struct lov_stripe_md **lsmp)
624 {
625         struct lov_obd *lov = &exp->exp_obd->u.lov;
626         struct obd_trans_info *oti = set->set_oti;
627         struct obdo *src_oa = set->set_oi->oi_oa;
628         struct lov_request *req;
629         struct obdo *ret_oa = NULL;
630         int success, attrset = 0, rc = 0;
631         ENTRY;
632
633         LASSERT(cfs_atomic_read(&set->set_completes));
634
635         /* try alloc objects on other osts if osc_create fails for
636          * exceptions: RPC failure, ENOSPC, etc */
637         if (set->set_count != cfs_atomic_read(&set->set_success)) {
638                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
639                         if (req->rq_rc == 0)
640                                 continue;
641
642                         cfs_atomic_dec(&set->set_completes);
643                         req->rq_complete = 0;
644
645                         rc = qos_remedy_create(set, req);
646                         lov_update_create_set(set, req, rc);
647                 }
648         }
649
650         success = cfs_atomic_read(&set->set_success);
651         /* no successful creates */
652         if (success == 0)
653                 GOTO(cleanup, rc);
654
655         if (set->set_count != success) {
656                 set->set_count = success;
657                 qos_shrink_lsm(set);
658         }
659
660         OBDO_ALLOC(ret_oa);
661         if (ret_oa == NULL)
662                 GOTO(cleanup, rc = -ENOMEM);
663
664         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
665                 if (!req->rq_complete || req->rq_rc)
666                         continue;
667                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
668                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
669                                 req->rq_stripe, &attrset);
670         }
671         if (src_oa->o_valid & OBD_MD_FLSIZE &&
672             ret_oa->o_size != src_oa->o_size) {
673                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
674                        src_oa->o_size, ret_oa->o_size);
675                 LBUG();
676         }
677         ret_oa->o_id = src_oa->o_id;
678         ret_oa->o_seq = src_oa->o_seq;
679         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
680         memcpy(src_oa, ret_oa, sizeof(*src_oa));
681         OBDO_FREE(ret_oa);
682
683         *lsmp = set->set_oi->oi_md;
684         GOTO(done, rc = 0);
685
686 cleanup:
687         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
688                 struct obd_export *sub_exp;
689                 int err = 0;
690
691                 if (!req->rq_complete || req->rq_rc)
692                         continue;
693
694                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
695                 err = obd_destroy(NULL, sub_exp, req->rq_oi.oi_oa, NULL, oti,
696                                   NULL, NULL);
697                 if (err)
698                         CERROR("Failed to uncreate objid "LPX64" subobj "
699                                LPX64" on OST idx %d: rc = %d\n",
700                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
701                                req->rq_idx, rc);
702         }
703         if (*lsmp == NULL)
704                 obd_free_memmd(exp, &set->set_oi->oi_md);
705 done:
706         if (oti && set->set_cookies) {
707                 oti->oti_logcookies = set->set_cookies;
708                 if (!set->set_cookie_sent) {
709                         oti_free_cookies(oti);
710                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
711                 } else {
712                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
713                 }
714         }
715         RETURN(rc);
716 }
717
718 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
719 {
720         int rc = 0;
721         ENTRY;
722
723         if (set == NULL)
724                 RETURN(0);
725         LASSERT(set->set_exp);
726         if (cfs_atomic_read(&set->set_completes))
727                 rc = create_done(set->set_exp, set, lsmp);
728
729         lov_put_reqset(set);
730         RETURN(rc);
731 }
732
733 int cb_create_update(void *cookie, int rc)
734 {
735         struct obd_info *oinfo = cookie;
736         struct lov_request *lovreq;
737
738         lovreq = container_of(oinfo, struct lov_request, rq_oi);
739
740         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
741                 if (lovreq->rq_idx == cfs_fail_val)
742                         rc = -ENOTCONN;
743
744         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
745         if (lov_finished_set(lovreq->rq_rqset))
746                 lov_put_reqset(lovreq->rq_rqset);
747         return rc;
748 }
749
750 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
751                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
752                         struct obd_trans_info *oti,
753                         struct lov_request_set **reqset)
754 {
755         struct lov_request_set *set;
756         int rc = 0;
757         ENTRY;
758
759         OBD_ALLOC(set, sizeof(*set));
760         if (set == NULL)
761                 RETURN(-ENOMEM);
762         lov_init_set(set);
763
764         set->set_exp = exp;
765         set->set_oi = oinfo;
766         set->set_oi->oi_md = *lsmp;
767         set->set_oi->oi_oa = src_oa;
768         set->set_oti = oti;
769         lov_get_reqset(set);
770
771         rc = qos_prep_create(exp, set);
772         /* qos_shrink_lsm() may have allocated a new lsm */
773         *lsmp = oinfo->oi_md;
774         if (rc) {
775                 lov_fini_create_set(set, lsmp);
776                 lov_put_reqset(set);
777         } else {
778                 *reqset = set;
779         }
780         RETURN(rc);
781 }
782
783 static int common_attr_done(struct lov_request_set *set)
784 {
785         cfs_list_t *pos;
786         struct lov_request *req;
787         struct obdo *tmp_oa;
788         int rc = 0, attrset = 0;
789         ENTRY;
790
791         LASSERT(set->set_oi != NULL);
792
793         if (set->set_oi->oi_oa == NULL)
794                 RETURN(0);
795
796         if (!cfs_atomic_read(&set->set_success))
797                 RETURN(-EIO);
798
799         OBDO_ALLOC(tmp_oa);
800         if (tmp_oa == NULL)
801                 GOTO(out, rc = -ENOMEM);
802
803         cfs_list_for_each (pos, &set->set_list) {
804                 req = cfs_list_entry(pos, struct lov_request, rq_link);
805
806                 if (!req->rq_complete || req->rq_rc)
807                         continue;
808                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
809                         continue;
810                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
811                                 req->rq_oi.oi_oa->o_valid,
812                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
813         }
814         if (!attrset) {
815                 CERROR("No stripes had valid attrs\n");
816                 rc = -EIO;
817         }
818         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
819             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
820                 /* When we take attributes of some epoch, we require all the
821                  * ost to be active. */
822                 CERROR("Not all the stripes had valid attrs\n");
823                 GOTO(out, rc = -EIO);
824         }
825
826         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
827         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
828 out:
829         if (tmp_oa)
830                 OBDO_FREE(tmp_oa);
831         RETURN(rc);
832
833 }
834
835 static int brw_done(struct lov_request_set *set)
836 {
837         struct lov_stripe_md *lsm = set->set_oi->oi_md;
838         struct lov_oinfo     *loi = NULL;
839         cfs_list_t *pos;
840         struct lov_request *req;
841         ENTRY;
842
843         cfs_list_for_each (pos, &set->set_list) {
844                 req = cfs_list_entry(pos, struct lov_request, rq_link);
845
846                 if (!req->rq_complete || req->rq_rc)
847                         continue;
848
849                 loi = lsm->lsm_oinfo[req->rq_stripe];
850
851                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
852                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
853         }
854
855         RETURN(0);
856 }
857
858 int lov_fini_brw_set(struct lov_request_set *set)
859 {
860         int rc = 0;
861         ENTRY;
862
863         if (set == NULL)
864                 RETURN(0);
865         LASSERT(set->set_exp);
866         if (cfs_atomic_read(&set->set_completes)) {
867                 rc = brw_done(set);
868                 /* FIXME update qos data here */
869         }
870         lov_put_reqset(set);
871
872         RETURN(rc);
873 }
874
875 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
876                      obd_count oa_bufs, struct brw_page *pga,
877                      struct obd_trans_info *oti,
878                      struct lov_request_set **reqset)
879 {
880         struct {
881                 obd_count       index;
882                 obd_count       count;
883                 obd_count       off;
884         } *info = NULL;
885         struct lov_request_set *set;
886         struct lov_obd *lov = &exp->exp_obd->u.lov;
887         int rc = 0, i, shift;
888         ENTRY;
889
890         OBD_ALLOC(set, sizeof(*set));
891         if (set == NULL)
892                 RETURN(-ENOMEM);
893         lov_init_set(set);
894
895         set->set_exp = exp;
896         set->set_oti = oti;
897         set->set_oi = oinfo;
898         set->set_oabufs = oa_bufs;
899         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
900         if (!set->set_pga)
901                 GOTO(out, rc = -ENOMEM);
902
903         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
904         if (!info)
905                 GOTO(out, rc = -ENOMEM);
906
907         /* calculate the page count for each stripe */
908         for (i = 0; i < oa_bufs; i++) {
909                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
910                 info[stripe].count++;
911         }
912
913         /* alloc and initialize lov request */
914         shift = 0;
915         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
916                 struct lov_oinfo *loi = NULL;
917                 struct lov_request *req;
918
919                 if (info[i].count == 0)
920                         continue;
921
922                 loi = oinfo->oi_md->lsm_oinfo[i];
923                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
924                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
925                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
926                         GOTO(out, rc = -EIO);
927                 }
928
929                 OBD_ALLOC(req, sizeof(*req));
930                 if (req == NULL)
931                         GOTO(out, rc = -ENOMEM);
932
933                 OBDO_ALLOC(req->rq_oi.oi_oa);
934                 if (req->rq_oi.oi_oa == NULL) {
935                         OBD_FREE(req, sizeof(*req));
936                         GOTO(out, rc = -ENOMEM);
937                 }
938
939                 if (oinfo->oi_oa) {
940                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
941                                sizeof(*req->rq_oi.oi_oa));
942                 }
943                 req->rq_oi.oi_oa->o_id = loi->loi_id;
944                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
945                 req->rq_oi.oi_oa->o_stripe_idx = i;
946
947                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
948                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
949                 if (req->rq_oi.oi_md == NULL) {
950                         OBDO_FREE(req->rq_oi.oi_oa);
951                         OBD_FREE(req, sizeof(*req));
952                         GOTO(out, rc = -ENOMEM);
953                 }
954
955                 req->rq_idx = loi->loi_ost_idx;
956                 req->rq_stripe = i;
957
958                 /* XXX LOV STACKING */
959                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
960                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
961                 req->rq_oabufs = info[i].count;
962                 req->rq_pgaidx = shift;
963                 shift += req->rq_oabufs;
964
965                 /* remember the index for sort brw_page array */
966                 info[i].index = req->rq_pgaidx;
967
968                 req->rq_oi.oi_capa = oinfo->oi_capa;
969
970                 lov_set_add_req(req, set);
971         }
972         if (!set->set_count)
973                 GOTO(out, rc = -EIO);
974
975         /* rotate & sort the brw_page array */
976         for (i = 0; i < oa_bufs; i++) {
977                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
978
979                 shift = info[stripe].index + info[stripe].off;
980                 LASSERT(shift < oa_bufs);
981                 set->set_pga[shift] = pga[i];
982                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
983                                   &set->set_pga[shift].off);
984                 info[stripe].off++;
985         }
986 out:
987         if (info)
988                 OBD_FREE_LARGE(info,
989                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
990
991         if (rc == 0)
992                 *reqset = set;
993         else
994                 lov_fini_brw_set(set);
995
996         RETURN(rc);
997 }
998
999 int lov_fini_getattr_set(struct lov_request_set *set)
1000 {
1001         int rc = 0;
1002         ENTRY;
1003
1004         if (set == NULL)
1005                 RETURN(0);
1006         LASSERT(set->set_exp);
1007         if (cfs_atomic_read(&set->set_completes))
1008                 rc = common_attr_done(set);
1009
1010         lov_put_reqset(set);
1011
1012         RETURN(rc);
1013 }
1014
1015 /* The callback for osc_getattr_async that finilizes a request info when a
1016  * response is received. */
1017 static int cb_getattr_update(void *cookie, int rc)
1018 {
1019         struct obd_info *oinfo = cookie;
1020         struct lov_request *lovreq;
1021         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1022         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1023 }
1024
1025 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
1026                          struct lov_request_set **reqset)
1027 {
1028         struct lov_request_set *set;
1029         struct lov_obd *lov = &exp->exp_obd->u.lov;
1030         int rc = 0, i;
1031         ENTRY;
1032
1033         OBD_ALLOC(set, sizeof(*set));
1034         if (set == NULL)
1035                 RETURN(-ENOMEM);
1036         lov_init_set(set);
1037
1038         set->set_exp = exp;
1039         set->set_oi = oinfo;
1040
1041         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1042                 struct lov_oinfo *loi;
1043                 struct lov_request *req;
1044
1045                 loi = oinfo->oi_md->lsm_oinfo[i];
1046                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1047                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1048                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1049                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1050                                 /* SOM requires all the OSTs to be active. */
1051                                 GOTO(out_set, rc = -EIO);
1052                         continue;
1053                 }
1054
1055                 OBD_ALLOC(req, sizeof(*req));
1056                 if (req == NULL)
1057                         GOTO(out_set, rc = -ENOMEM);
1058
1059                 req->rq_stripe = i;
1060                 req->rq_idx = loi->loi_ost_idx;
1061
1062                 OBDO_ALLOC(req->rq_oi.oi_oa);
1063                 if (req->rq_oi.oi_oa == NULL) {
1064                         OBD_FREE(req, sizeof(*req));
1065                         GOTO(out_set, rc = -ENOMEM);
1066                 }
1067                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1068                        sizeof(*req->rq_oi.oi_oa));
1069                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1070                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1071                 req->rq_oi.oi_cb_up = cb_getattr_update;
1072                 req->rq_oi.oi_capa = oinfo->oi_capa;
1073
1074                 lov_set_add_req(req, set);
1075         }
1076         if (!set->set_count)
1077                 GOTO(out_set, rc = -EIO);
1078         *reqset = set;
1079         RETURN(rc);
1080 out_set:
1081         lov_fini_getattr_set(set);
1082         RETURN(rc);
1083 }
1084
1085 int lov_fini_destroy_set(struct lov_request_set *set)
1086 {
1087         ENTRY;
1088
1089         if (set == NULL)
1090                 RETURN(0);
1091         LASSERT(set->set_exp);
1092         if (cfs_atomic_read(&set->set_completes)) {
1093                 /* FIXME update qos data here */
1094         }
1095
1096         lov_put_reqset(set);
1097
1098         RETURN(0);
1099 }
1100
1101 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1102                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1103                          struct obd_trans_info *oti,
1104                          struct lov_request_set **reqset)
1105 {
1106         struct lov_request_set *set;
1107         struct lov_obd *lov = &exp->exp_obd->u.lov;
1108         int rc = 0, i;
1109         ENTRY;
1110
1111         OBD_ALLOC(set, sizeof(*set));
1112         if (set == NULL)
1113                 RETURN(-ENOMEM);
1114         lov_init_set(set);
1115
1116         set->set_exp = exp;
1117         set->set_oi = oinfo;
1118         set->set_oi->oi_md = lsm;
1119         set->set_oi->oi_oa = src_oa;
1120         set->set_oti = oti;
1121         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1122                 set->set_cookies = oti->oti_logcookies;
1123
1124         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1125                 struct lov_oinfo *loi;
1126                 struct lov_request *req;
1127
1128                 loi = lsm->lsm_oinfo[i];
1129                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1130                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1131                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1132                         continue;
1133                 }
1134
1135                 OBD_ALLOC(req, sizeof(*req));
1136                 if (req == NULL)
1137                         GOTO(out_set, rc = -ENOMEM);
1138
1139                 req->rq_stripe = i;
1140                 req->rq_idx = loi->loi_ost_idx;
1141
1142                 OBDO_ALLOC(req->rq_oi.oi_oa);
1143                 if (req->rq_oi.oi_oa == NULL) {
1144                         OBD_FREE(req, sizeof(*req));
1145                         GOTO(out_set, rc = -ENOMEM);
1146                 }
1147                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1148                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1149                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1150                 lov_set_add_req(req, set);
1151         }
1152         if (!set->set_count)
1153                 GOTO(out_set, rc = -EIO);
1154         *reqset = set;
1155         RETURN(rc);
1156 out_set:
1157         lov_fini_destroy_set(set);
1158         RETURN(rc);
1159 }
1160
1161 int lov_fini_setattr_set(struct lov_request_set *set)
1162 {
1163         int rc = 0;
1164         ENTRY;
1165
1166         if (set == NULL)
1167                 RETURN(0);
1168         LASSERT(set->set_exp);
1169         if (cfs_atomic_read(&set->set_completes)) {
1170                 rc = common_attr_done(set);
1171                 /* FIXME update qos data here */
1172         }
1173
1174         lov_put_reqset(set);
1175         RETURN(rc);
1176 }
1177
1178 int lov_update_setattr_set(struct lov_request_set *set,
1179                            struct lov_request *req, int rc)
1180 {
1181         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1182         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1183         ENTRY;
1184
1185         lov_update_set(set, req, rc);
1186
1187         /* grace error on inactive ost */
1188         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1189                     lov->lov_tgts[req->rq_idx]->ltd_active))
1190                 rc = 0;
1191
1192         if (rc == 0) {
1193                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1194                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1195                                 req->rq_oi.oi_oa->o_ctime;
1196                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1197                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1198                                 req->rq_oi.oi_oa->o_mtime;
1199                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1200                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1201                                 req->rq_oi.oi_oa->o_atime;
1202         }
1203
1204         RETURN(rc);
1205 }
1206
1207 /* The callback for osc_setattr_async that finilizes a request info when a
1208  * response is received. */
1209 static int cb_setattr_update(void *cookie, int rc)
1210 {
1211         struct obd_info *oinfo = cookie;
1212         struct lov_request *lovreq;
1213         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1214         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1215 }
1216
1217 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1218                          struct obd_trans_info *oti,
1219                          struct lov_request_set **reqset)
1220 {
1221         struct lov_request_set *set;
1222         struct lov_obd *lov = &exp->exp_obd->u.lov;
1223         int rc = 0, i;
1224         ENTRY;
1225
1226         OBD_ALLOC(set, sizeof(*set));
1227         if (set == NULL)
1228                 RETURN(-ENOMEM);
1229         lov_init_set(set);
1230
1231         set->set_exp = exp;
1232         set->set_oti = oti;
1233         set->set_oi = oinfo;
1234         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1235                 set->set_cookies = oti->oti_logcookies;
1236
1237         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1238                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1239                 struct lov_request *req;
1240
1241                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1242                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1243                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1244                         continue;
1245                 }
1246
1247                 OBD_ALLOC(req, sizeof(*req));
1248                 if (req == NULL)
1249                         GOTO(out_set, rc = -ENOMEM);
1250                 req->rq_stripe = i;
1251                 req->rq_idx = loi->loi_ost_idx;
1252
1253                 OBDO_ALLOC(req->rq_oi.oi_oa);
1254                 if (req->rq_oi.oi_oa == NULL) {
1255                         OBD_FREE(req, sizeof(*req));
1256                         GOTO(out_set, rc = -ENOMEM);
1257                 }
1258                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1259                        sizeof(*req->rq_oi.oi_oa));
1260                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1261                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1262                 req->rq_oi.oi_oa->o_stripe_idx = i;
1263                 req->rq_oi.oi_cb_up = cb_setattr_update;
1264                 req->rq_oi.oi_capa = oinfo->oi_capa;
1265
1266                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1267                         int off = lov_stripe_offset(oinfo->oi_md,
1268                                                     oinfo->oi_oa->o_size, i,
1269                                                     &req->rq_oi.oi_oa->o_size);
1270
1271                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1272                                 req->rq_oi.oi_oa->o_size--;
1273
1274                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1275                                i, req->rq_oi.oi_oa->o_size,
1276                                oinfo->oi_oa->o_size);
1277                 }
1278                 lov_set_add_req(req, set);
1279         }
1280         if (!set->set_count)
1281                 GOTO(out_set, rc = -EIO);
1282         *reqset = set;
1283         RETURN(rc);
1284 out_set:
1285         lov_fini_setattr_set(set);
1286         RETURN(rc);
1287 }
1288
1289 int lov_fini_punch_set(struct lov_request_set *set)
1290 {
1291         int rc = 0;
1292         ENTRY;
1293
1294         if (set == NULL)
1295                 RETURN(0);
1296         LASSERT(set->set_exp);
1297         if (cfs_atomic_read(&set->set_completes)) {
1298                 rc = -EIO;
1299                 /* FIXME update qos data here */
1300                 if (cfs_atomic_read(&set->set_success))
1301                         rc = common_attr_done(set);
1302         }
1303
1304         lov_put_reqset(set);
1305
1306         RETURN(rc);
1307 }
1308
1309 int lov_update_punch_set(struct lov_request_set *set,
1310                          struct lov_request *req, int rc)
1311 {
1312         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1313         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1314         ENTRY;
1315
1316         lov_update_set(set, req, rc);
1317
1318         /* grace error on inactive ost */
1319         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1320                 rc = 0;
1321
1322         if (rc == 0) {
1323                 lov_stripe_lock(lsm);
1324                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1325                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1326                                 req->rq_oi.oi_oa->o_blocks;
1327                 }
1328
1329                 /* Do we need to update lvb_size here? It needn't because
1330                  * it have been done in ll_truncate(). -jay */
1331                 lov_stripe_unlock(lsm);
1332         }
1333
1334         RETURN(rc);
1335 }
1336
1337 /* The callback for osc_punch that finilizes a request info when a response
1338  * is received. */
1339 static int cb_update_punch(void *cookie, int rc)
1340 {
1341         struct obd_info *oinfo = cookie;
1342         struct lov_request *lovreq;
1343         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1344         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1345 }
1346
1347 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1348                        struct obd_trans_info *oti,
1349                        struct lov_request_set **reqset)
1350 {
1351         struct lov_request_set *set;
1352         struct lov_obd *lov = &exp->exp_obd->u.lov;
1353         int rc = 0, i;
1354         ENTRY;
1355
1356         OBD_ALLOC(set, sizeof(*set));
1357         if (set == NULL)
1358                 RETURN(-ENOMEM);
1359         lov_init_set(set);
1360
1361         set->set_oi = oinfo;
1362         set->set_exp = exp;
1363
1364         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1365                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1366                 struct lov_request *req;
1367                 obd_off rs, re;
1368
1369                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1370                                            oinfo->oi_policy.l_extent.start,
1371                                            oinfo->oi_policy.l_extent.end,
1372                                            &rs, &re))
1373                         continue;
1374
1375                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1376                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1377                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1378                         GOTO(out_set, rc = -EIO);
1379                 }
1380
1381                 OBD_ALLOC(req, sizeof(*req));
1382                 if (req == NULL)
1383                         GOTO(out_set, rc = -ENOMEM);
1384                 req->rq_stripe = i;
1385                 req->rq_idx = loi->loi_ost_idx;
1386
1387                 OBDO_ALLOC(req->rq_oi.oi_oa);
1388                 if (req->rq_oi.oi_oa == NULL) {
1389                         OBD_FREE(req, sizeof(*req));
1390                         GOTO(out_set, rc = -ENOMEM);
1391                 }
1392                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1393                        sizeof(*req->rq_oi.oi_oa));
1394                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1395                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1396                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1397
1398                 req->rq_oi.oi_oa->o_stripe_idx = i;
1399                 req->rq_oi.oi_cb_up = cb_update_punch;
1400
1401                 req->rq_oi.oi_policy.l_extent.start = rs;
1402                 req->rq_oi.oi_policy.l_extent.end = re;
1403                 req->rq_oi.oi_policy.l_extent.gid = -1;
1404
1405                 req->rq_oi.oi_capa = oinfo->oi_capa;
1406
1407                 lov_set_add_req(req, set);
1408         }
1409         if (!set->set_count)
1410                 GOTO(out_set, rc = -EIO);
1411         *reqset = set;
1412         RETURN(rc);
1413 out_set:
1414         lov_fini_punch_set(set);
1415         RETURN(rc);
1416 }
1417
1418 int lov_fini_sync_set(struct lov_request_set *set)
1419 {
1420         int rc = 0;
1421         ENTRY;
1422
1423         if (set == NULL)
1424                 RETURN(0);
1425         LASSERT(set->set_exp);
1426         if (cfs_atomic_read(&set->set_completes)) {
1427                 if (!cfs_atomic_read(&set->set_success))
1428                         rc = -EIO;
1429                 /* FIXME update qos data here */
1430         }
1431
1432         lov_put_reqset(set);
1433
1434         RETURN(rc);
1435 }
1436
1437 /* The callback for osc_sync that finilizes a request info when a
1438  * response is recieved. */
1439 static int cb_sync_update(void *cookie, int rc)
1440 {
1441         struct obd_info *oinfo = cookie;
1442         struct lov_request *lovreq;
1443
1444         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1445         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1446 }
1447
1448 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1449                       obd_off start, obd_off end,
1450                       struct lov_request_set **reqset)
1451 {
1452         struct lov_request_set *set;
1453         struct lov_obd *lov = &exp->exp_obd->u.lov;
1454         int rc = 0, i;
1455         ENTRY;
1456
1457         OBD_ALLOC_PTR(set);
1458         if (set == NULL)
1459                 RETURN(-ENOMEM);
1460         lov_init_set(set);
1461
1462         set->set_exp = exp;
1463         set->set_oi = oinfo;
1464
1465         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1466                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1467                 struct lov_request *req;
1468                 obd_off rs, re;
1469
1470                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1471                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1472                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1473                         continue;
1474                 }
1475
1476                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1477                                            &re))
1478                         continue;
1479
1480                 OBD_ALLOC_PTR(req);
1481                 if (req == NULL)
1482                         GOTO(out_set, rc = -ENOMEM);
1483                 req->rq_stripe = i;
1484                 req->rq_idx = loi->loi_ost_idx;
1485
1486                 OBDO_ALLOC(req->rq_oi.oi_oa);
1487                 if (req->rq_oi.oi_oa == NULL) {
1488                         OBD_FREE(req, sizeof(*req));
1489                         GOTO(out_set, rc = -ENOMEM);
1490                 }
1491                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1492                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1493                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1494                 req->rq_oi.oi_oa->o_stripe_idx = i;
1495
1496                 req->rq_oi.oi_policy.l_extent.start = rs;
1497                 req->rq_oi.oi_policy.l_extent.end = re;
1498                 req->rq_oi.oi_policy.l_extent.gid = -1;
1499                 req->rq_oi.oi_cb_up = cb_sync_update;
1500
1501                 lov_set_add_req(req, set);
1502         }
1503         if (!set->set_count)
1504                 GOTO(out_set, rc = -EIO);
1505         *reqset = set;
1506         RETURN(rc);
1507 out_set:
1508         lov_fini_sync_set(set);
1509         RETURN(rc);
1510 }
1511
1512 #define LOV_U64_MAX ((__u64)~0ULL)
1513 #define LOV_SUM_MAX(tot, add)                                           \
1514         do {                                                            \
1515                 if ((tot) + (add) < (tot))                              \
1516                         (tot) = LOV_U64_MAX;                            \
1517                 else                                                    \
1518                         (tot) += (add);                                 \
1519         } while(0)
1520
1521 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1522 {
1523         ENTRY;
1524
1525         if (success) {
1526                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1527                                                            LOV_MAGIC, 0);
1528                 if (osfs->os_files != LOV_U64_MAX)
1529                         lov_do_div64(osfs->os_files, expected_stripes);
1530                 if (osfs->os_ffree != LOV_U64_MAX)
1531                         lov_do_div64(osfs->os_ffree, expected_stripes);
1532
1533                 cfs_spin_lock(&obd->obd_osfs_lock);
1534                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1535                 obd->obd_osfs_age = cfs_time_current_64();
1536                 cfs_spin_unlock(&obd->obd_osfs_lock);
1537                 RETURN(0);
1538         }
1539
1540         RETURN(-EIO);
1541 }
1542
1543 int lov_fini_statfs_set(struct lov_request_set *set)
1544 {
1545         int rc = 0;
1546         ENTRY;
1547
1548         if (set == NULL)
1549                 RETURN(0);
1550
1551         if (cfs_atomic_read(&set->set_completes)) {
1552                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1553                                      cfs_atomic_read(&set->set_success));
1554         }
1555         lov_put_reqset(set);
1556         RETURN(rc);
1557 }
1558
1559 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1560                        int success)
1561 {
1562         int shift = 0, quit = 0;
1563         __u64 tmp;
1564
1565         if (success == 0) {
1566                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1567         } else {
1568                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1569                         /* assume all block sizes are always powers of 2 */
1570                         /* get the bits difference */
1571                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1572                         for (shift = 0; shift <= 64; ++shift) {
1573                                 if (tmp & 1) {
1574                                         if (quit)
1575                                                 break;
1576                                         else
1577                                                 quit = 1;
1578                                         shift = 0;
1579                                 }
1580                                 tmp >>= 1;
1581                         }
1582                 }
1583
1584                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1585                         osfs->os_bsize = lov_sfs->os_bsize;
1586
1587                         osfs->os_bfree  >>= shift;
1588                         osfs->os_bavail >>= shift;
1589                         osfs->os_blocks >>= shift;
1590                 } else if (shift != 0) {
1591                         lov_sfs->os_bfree  >>= shift;
1592                         lov_sfs->os_bavail >>= shift;
1593                         lov_sfs->os_blocks >>= shift;
1594                 }
1595 #ifdef MIN_DF
1596                 /* Sandia requested that df (and so, statfs) only
1597                    returned minimal available space on
1598                    a single OST, so people would be able to
1599                    write this much data guaranteed. */
1600                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1601                         /* Presumably if new bavail is smaller,
1602                            new bfree is bigger as well */
1603                         osfs->os_bfree = lov_sfs->os_bfree;
1604                         osfs->os_bavail = lov_sfs->os_bavail;
1605                 }
1606 #else
1607                 osfs->os_bfree += lov_sfs->os_bfree;
1608                 osfs->os_bavail += lov_sfs->os_bavail;
1609 #endif
1610                 osfs->os_blocks += lov_sfs->os_blocks;
1611                 /* XXX not sure about this one - depends on policy.
1612                  *   - could be minimum if we always stripe on all OBDs
1613                  *     (but that would be wrong for any other policy,
1614                  *     if one of the OBDs has no more objects left)
1615                  *   - could be sum if we stripe whole objects
1616                  *   - could be average, just to give a nice number
1617                  *
1618                  * To give a "reasonable" (if not wholly accurate)
1619                  * number, we divide the total number of free objects
1620                  * by expected stripe count (watch out for overflow).
1621                  */
1622                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1623                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1624         }
1625 }
1626
1627 /* The callback for osc_statfs_async that finilizes a request info when a
1628  * response is received. */
1629 static int cb_statfs_update(void *cookie, int rc)
1630 {
1631         struct obd_info *oinfo = cookie;
1632         struct lov_request *lovreq;
1633         struct lov_request_set *set;
1634         struct obd_statfs *osfs, *lov_sfs;
1635         struct lov_obd *lov;
1636         struct lov_tgt_desc *tgt;
1637         struct obd_device *lovobd, *tgtobd;
1638         int success;
1639         ENTRY;
1640
1641         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1642         set = lovreq->rq_rqset;
1643         lovobd = set->set_obd;
1644         lov = &lovobd->u.lov;
1645         osfs = set->set_oi->oi_osfs;
1646         lov_sfs = oinfo->oi_osfs;
1647         success = cfs_atomic_read(&set->set_success);
1648         /* XXX: the same is done in lov_update_common_set, however
1649            lovset->set_exp is not initialized. */
1650         lov_update_set(set, lovreq, rc);
1651         if (rc)
1652                 GOTO(out, rc);
1653
1654         obd_getref(lovobd);
1655         tgt = lov->lov_tgts[lovreq->rq_idx];
1656         if (!tgt || !tgt->ltd_active)
1657                 GOTO(out_update, rc);
1658
1659         tgtobd = class_exp2obd(tgt->ltd_exp);
1660         cfs_spin_lock(&tgtobd->obd_osfs_lock);
1661         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1662         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1663                 tgtobd->obd_osfs_age = cfs_time_current_64();
1664         cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1665
1666 out_update:
1667         lov_update_statfs(osfs, lov_sfs, success);
1668         qos_update(lov);
1669         obd_putref(lovobd);
1670
1671 out:
1672         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1673             lov_finished_set(set)) {
1674                 lov_statfs_interpret(NULL, set, set->set_count !=
1675                                      cfs_atomic_read(&set->set_success));
1676                 if (lov->lov_qos.lq_statfs_in_progress)
1677                         qos_statfs_done(lov);
1678         }
1679
1680         RETURN(0);
1681 }
1682
1683 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1684                         struct lov_request_set **reqset)
1685 {
1686         struct lov_request_set *set;
1687         struct lov_obd *lov = &obd->u.lov;
1688         int rc = 0, i;
1689         ENTRY;
1690
1691         OBD_ALLOC(set, sizeof(*set));
1692         if (set == NULL)
1693                 RETURN(-ENOMEM);
1694         lov_init_set(set);
1695
1696         set->set_obd = obd;
1697         set->set_oi = oinfo;
1698
1699         /* We only get block data from the OBD */
1700         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1701                 struct lov_request *req;
1702
1703                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1704                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1705                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1706                         continue;
1707                 }
1708
1709                 /* skip targets that have been explicitely disabled by the
1710                  * administrator */
1711                 if (!lov->lov_tgts[i]->ltd_exp) {
1712                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1713                         continue;
1714                 }
1715
1716                 OBD_ALLOC(req, sizeof(*req));
1717                 if (req == NULL)
1718                         GOTO(out_set, rc = -ENOMEM);
1719
1720                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1721                 if (req->rq_oi.oi_osfs == NULL) {
1722                         OBD_FREE(req, sizeof(*req));
1723                         GOTO(out_set, rc = -ENOMEM);
1724                 }
1725
1726                 req->rq_idx = i;
1727                 req->rq_oi.oi_cb_up = cb_statfs_update;
1728                 req->rq_oi.oi_flags = oinfo->oi_flags;
1729
1730                 lov_set_add_req(req, set);
1731         }
1732         if (!set->set_count)
1733                 GOTO(out_set, rc = -EIO);
1734         *reqset = set;
1735         RETURN(rc);
1736 out_set:
1737         lov_fini_statfs_set(set);
1738         RETURN(rc);
1739 }