Whamcloud - gitweb
LU-6587 obdclass: use OBD_FREE_LARGE with OBD_ALLOC_LARGE
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2015, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into an osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * There are three kinds of transactions
57  *
58  * 1. Local transaction, all of updates of the transaction are in the local MDT.
59  * 2. Remote transaction, all of updates of the transaction are in one remote
60  * MDT, which only happens in LFSCK now.
61  * 3. Distribute transaction, updates for the transaction are in mulitple MDTs.
62  *
63  * Author: Di Wang <di.wang@intel.com>
64  * Author: Fan, Yong <fan.yong@intel.com>
65  */
66
67 #define DEBUG_SUBSYSTEM S_MDS
68
69 #include <lustre_net.h>
70 #include "osp_internal.h"
71
72 /**
73  * The argument for the interpreter callback of osp request.
74  */
75 struct osp_update_args {
76         struct osp_update_request *oaua_update;
77         atomic_t                 *oaua_count;
78         wait_queue_head_t        *oaua_waitq;
79         bool                      oaua_flow_control;
80 };
81
82 /**
83  * Call back for each update request.
84  */
85 struct osp_update_callback {
86         /* list in the osp_update_request::our_cb_items */
87         struct list_head                 ouc_list;
88
89         /* The target of the async update request. */
90         struct osp_object               *ouc_obj;
91
92         /* The data used by or_interpreter. */
93         void                            *ouc_data;
94
95         /* The interpreter function called after the async request handled. */
96         osp_update_interpreter_t        ouc_interpreter;
97 };
98
99 static struct object_update_request *object_update_request_alloc(size_t size)
100 {
101         struct object_update_request *ourq;
102
103         OBD_ALLOC_LARGE(ourq, size);
104         if (ourq == NULL)
105                 return ERR_PTR(-ENOMEM);
106
107         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
108         ourq->ourq_count = 0;
109
110         return ourq;
111 }
112
113 /**
114  * Allocate new update request
115  *
116  * Allocate new update request and insert it to the req_update_list.
117  *
118  * \param [in] our      osp_udate_request where to create a new
119  *                      update request
120  *
121  * \retval      0 if creation succeeds.
122  * \retval      negative errno if creation fails.
123  */
124 int osp_object_update_request_create(struct osp_update_request *our,
125                                      size_t size)
126 {
127         struct osp_update_request_sub *ours;
128
129         OBD_ALLOC_PTR(ours);
130         if (ours == NULL)
131                 return -ENOMEM;
132
133         if (size < OUT_UPDATE_INIT_BUFFER_SIZE)
134                 size = OUT_UPDATE_INIT_BUFFER_SIZE;
135
136         ours->ours_req = object_update_request_alloc(size);
137
138         if (IS_ERR(ours->ours_req)) {
139                 OBD_FREE_PTR(ours);
140                 return -ENOMEM;
141         }
142
143         ours->ours_req_size = size;
144         INIT_LIST_HEAD(&ours->ours_list);
145         list_add_tail(&ours->ours_list, &our->our_req_list);
146         our->our_req_nr++;
147
148         return 0;
149 }
150
151 /**
152  * Get current update request
153  *
154  * Get current object update request from our_req_list in
155  * osp_update_request, because we always insert the new update
156  * request in the last position, so the last update request
157  * in the list will be the current update req.
158  *
159  * \param[in] our       osp update request where to get the
160  *                      current object update.
161  *
162  * \retval              the current object update.
163  **/
164 struct osp_update_request_sub *
165 osp_current_object_update_request(struct osp_update_request *our)
166 {
167         if (list_empty(&our->our_req_list))
168                 return NULL;
169
170         return list_entry(our->our_req_list.prev, struct osp_update_request_sub,
171                           ours_list);
172 }
173
174 /**
175  * Allocate and initialize osp_update_request
176  *
177  * osp_update_request is being used to track updates being executed on
178  * this dt_device(OSD or OSP). The update buffer will be 4k initially,
179  * and increased if needed.
180  *
181  * \param [in] dt       dt device
182  *
183  * \retval              osp_update_request being allocated if succeed
184  * \retval              ERR_PTR(errno) if failed
185  */
186 struct osp_update_request *osp_update_request_create(struct dt_device *dt)
187 {
188         struct osp_update_request *our;
189
190         OBD_ALLOC_PTR(our);
191         if (our == NULL)
192                 return ERR_PTR(-ENOMEM);
193
194         INIT_LIST_HEAD(&our->our_req_list);
195         INIT_LIST_HEAD(&our->our_cb_items);
196         INIT_LIST_HEAD(&our->our_list);
197         spin_lock_init(&our->our_list_lock);
198
199         osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
200         return our;
201 }
202
203 void osp_update_request_destroy(struct osp_update_request *our)
204 {
205         struct osp_update_request_sub *ours;
206         struct osp_update_request_sub *tmp;
207
208         if (our == NULL)
209                 return;
210
211         list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) {
212                 list_del(&ours->ours_list);
213                 if (ours->ours_req != NULL)
214                         OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size);
215                 OBD_FREE_PTR(ours);
216         }
217         OBD_FREE_PTR(our);
218 }
219
220 static void
221 object_update_request_dump(const struct object_update_request *ourq,
222                            unsigned int mask)
223 {
224         unsigned int i;
225         size_t total_size = 0;
226
227         for (i = 0; i < ourq->ourq_count; i++) {
228                 struct object_update    *update;
229                 size_t                  size = 0;
230
231                 update = object_update_request_get(ourq, i, &size);
232                 LASSERT(update != NULL);
233                 CDEBUG(mask, "i = %u fid = "DFID" op = %s "
234                        "params = %d batchid = "LPU64" size = %zu repsize %u\n",
235                        i, PFID(&update->ou_fid),
236                        update_op_str(update->ou_type),
237                        update->ou_params_count,
238                        update->ou_batchid, size,
239                        (unsigned)update->ou_result_size);
240
241                 total_size += size;
242         }
243
244         CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
245                ourq->ourq_magic, ourq->ourq_count, total_size);
246 }
247
248 /**
249  * Prepare inline update request
250  *
251  * Prepare OUT update ptlrpc inline request, and the request usually includes
252  * one update buffer, which does not need bulk transfer.
253  *
254  * \param[in] env       execution environment
255  * \param[in] req       ptlrpc request
256  * \param[in] ours      sub osp_update_request to be packed
257  *
258  * \retval              0 if packing succeeds
259  * \retval              negative errno if packing fails
260  */
261 int osp_prep_inline_update_req(const struct lu_env *env,
262                                struct ptlrpc_request *req,
263                                struct osp_update_request *our,
264                                int repsize)
265 {
266         struct osp_update_request_sub *ours;
267         struct out_update_header *ouh;
268         __u32 update_req_size;
269         int rc;
270
271         ours = list_entry(our->our_req_list.next,
272                           struct osp_update_request_sub, ours_list);
273         update_req_size = object_update_request_size(ours->ours_req);
274         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
275                              update_req_size + sizeof(*ouh));
276
277         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
278         if (rc != 0)
279                 RETURN(rc);
280
281         ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
282         ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
283         ouh->ouh_count = 1;
284         ouh->ouh_inline_length = update_req_size;
285         ouh->ouh_reply_size = repsize;
286
287         memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size);
288
289         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
290                              RCL_SERVER, repsize);
291
292         ptlrpc_request_set_replen(req);
293         req->rq_request_portal = OUT_PORTAL;
294         req->rq_reply_portal = OSC_REPLY_PORTAL;
295
296         RETURN(rc);
297 }
298
299 /**
300  * Prepare update request.
301  *
302  * Prepare OUT update ptlrpc request, and the request usually includes
303  * all of updates (stored in \param ureq) from one operation.
304  *
305  * \param[in] env       execution environment
306  * \param[in] imp       import on which ptlrpc request will be sent
307  * \param[in] ureq      hold all of updates which will be packed into the req
308  * \param[in] reqp      request to be created
309  *
310  * \retval              0 if preparation succeeds.
311  * \retval              negative errno if preparation fails.
312  */
313 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
314                         struct osp_update_request *our,
315                         struct ptlrpc_request **reqp)
316 {
317         struct ptlrpc_request           *req;
318         struct ptlrpc_bulk_desc         *desc;
319         struct osp_update_request_sub   *ours;
320         const struct object_update_request *ourq;
321         struct out_update_header        *ouh;
322         struct out_update_buffer        *oub;
323         __u32                           buf_count = 0;
324         int                             repsize = 0;
325         struct object_update_reply      *reply;
326         int                             rc, i;
327         int                             total = 0;
328         ENTRY;
329
330         list_for_each_entry(ours, &our->our_req_list, ours_list) {
331                 object_update_request_dump(ours->ours_req, D_INFO);
332
333                 ourq = ours->ours_req;
334                 for (i = 0; i < ourq->ourq_count; i++) {
335                         struct object_update    *update;
336                         size_t                  size = 0;
337
338
339                         /* XXX: it's very inefficient to lookup update
340                          *      this way, iterating from the beginning
341                          *      each time */
342                         update = object_update_request_get(ourq, i, &size);
343                         LASSERT(update != NULL);
344
345                         repsize += sizeof(reply->ourp_lens[0]);
346                         repsize += sizeof(struct object_update_result);
347                         repsize += update->ou_result_size;
348                 }
349
350                 buf_count++;
351         }
352         repsize += sizeof(*reply);
353         repsize = (repsize + OUT_UPDATE_REPLY_SIZE - 1) &
354                         ~(OUT_UPDATE_REPLY_SIZE - 1);
355         LASSERT(buf_count > 0);
356
357         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
358         if (req == NULL)
359                 RETURN(-ENOMEM);
360
361         if (buf_count == 1) {
362                 ours = list_entry(our->our_req_list.next,
363                                   struct osp_update_request_sub, ours_list);
364
365                 /* Let's check if it can be packed inline */
366                 if (object_update_request_size(ours->ours_req) +
367                     sizeof(struct out_update_header) <
368                                 OUT_UPDATE_MAX_INLINE_SIZE) {
369                         rc = osp_prep_inline_update_req(env, req, our, repsize);
370                         if (rc == 0)
371                                 *reqp = req;
372                         GOTO(out_req, rc);
373                 }
374         }
375
376         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
377                              sizeof(struct osp_update_request));
378
379         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
380                              buf_count * sizeof(*oub));
381
382         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
383         if (rc != 0)
384                 GOTO(out_req, rc);
385
386         ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
387         ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
388         ouh->ouh_count = buf_count;
389         ouh->ouh_inline_length = 0;
390         ouh->ouh_reply_size = repsize;
391         oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
392         list_for_each_entry(ours, &our->our_req_list, ours_list) {
393                 oub->oub_size = ours->ours_req_size;
394                 oub++;
395         }
396
397         req->rq_bulk_write = 1;
398         desc = ptlrpc_prep_bulk_imp(req, buf_count,
399                 MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
400                 PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
401                 MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
402         if (desc == NULL)
403                 GOTO(out_req, rc = -ENOMEM);
404
405         /* NB req now owns desc and will free it when it gets freed */
406         list_for_each_entry(ours, &our->our_req_list, ours_list) {
407                 desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
408                                                 ours->ours_req_size);
409                 total += ours->ours_req_size;
410         }
411         CDEBUG(D_OTHER, "total %d in %u\n", total, our->our_update_nr);
412
413         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
414                              RCL_SERVER, repsize);
415
416         ptlrpc_request_set_replen(req);
417         req->rq_request_portal = OUT_PORTAL;
418         req->rq_reply_portal = OSC_REPLY_PORTAL;
419         *reqp = req;
420
421 out_req:
422         if (rc < 0)
423                 ptlrpc_req_finished(req);
424
425         RETURN(rc);
426 }
427
428 /**
429  * Send update RPC.
430  *
431  * Send update request to the remote MDT synchronously.
432  *
433  * \param[in] env       execution environment
434  * \param[in] imp       import on which ptlrpc request will be sent
435  * \param[in] our       hold all of updates which will be packed into the req
436  * \param[in] reqp      request to be created
437  *
438  * \retval              0 if RPC succeeds.
439  * \retval              negative errno if RPC fails.
440  */
441 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
442                     struct osp_update_request *our,
443                     struct ptlrpc_request **reqp)
444 {
445         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
446         struct ptlrpc_request   *req = NULL;
447         int                     rc;
448         ENTRY;
449
450         rc = osp_prep_update_req(env, imp, our, &req);
451         if (rc != 0)
452                 RETURN(rc);
453
454         /* This will only be called with read-only update, and these updates
455          * might be used to retrieve update log during recovery process, so
456          * it will be allowed to send during recovery process */
457         req->rq_allow_replay = 1;
458         req->rq_allow_intr = 1;
459
460         /* Note: some dt index api might return non-zero result here, like
461          * osd_index_ea_lookup, so we should only check rc < 0 here */
462         rc = ptlrpc_queue_wait(req);
463         our->our_rc = rc;
464         if (rc < 0 || reqp == NULL)
465                 ptlrpc_req_finished(req);
466         else
467                 *reqp = req;
468
469         RETURN(rc);
470 }
471
472 /**
473  * Invalidate all objects in the osp thandle
474  *
475  * invalidate all of objects in the update request, which will be called
476  * when the transaction is aborted.
477  *
478  * \param[in] oth       osp thandle.
479  */
480 static void osp_thandle_invalidate_object(const struct lu_env *env,
481                                           struct osp_thandle *oth)
482 {
483         struct osp_update_request *our = oth->ot_our;
484         struct osp_update_request_sub *ours;
485
486         if (our == NULL)
487                 return;
488
489         list_for_each_entry(ours, &our->our_req_list, ours_list) {
490                 struct object_update_request *our_req = ours->ours_req;
491                 unsigned int i;
492                 struct lu_object *obj;
493                 struct osp_object *osp_obj;
494
495                 for (i = 0; i < our_req->ourq_count; i++) {
496                         struct object_update *update;
497
498                         update = object_update_request_get(our_req, i, NULL);
499                         if (update == NULL)
500                                 break;
501
502                         if (update->ou_type != OUT_WRITE)
503                                 continue;
504
505                         if (!fid_is_sane(&update->ou_fid))
506                                 continue;
507
508                         obj = lu_object_find_slice(env,
509                                         &oth->ot_super.th_dev->dd_lu_dev,
510                                         &update->ou_fid, NULL);
511                         if (IS_ERR(obj))
512                                 break;
513
514                         osp_obj = lu2osp_obj(obj);
515                         if (osp_obj->opo_ooa != NULL) {
516                                 spin_lock(&osp_obj->opo_lock);
517                                 osp_obj->opo_ooa->ooa_attr.la_valid = 0;
518                                 osp_obj->opo_stale = 1;
519                                 spin_unlock(&osp_obj->opo_lock);
520                         }
521                         lu_object_put(env, obj);
522                 }
523         }
524 }
525
526 static void osp_trans_stop_cb(const struct lu_env *env,
527                               struct osp_thandle *oth, int result)
528 {
529         struct dt_txn_commit_cb *dcb;
530         struct dt_txn_commit_cb *tmp;
531
532         /* call per-transaction stop callbacks if any */
533         list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
534                                  dcb_linkage) {
535                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
536                          "commit callback entry: magic=%x name='%s'\n",
537                          dcb->dcb_magic, dcb->dcb_name);
538                 list_del_init(&dcb->dcb_linkage);
539                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
540         }
541
542         if (result < 0)
543                 osp_thandle_invalidate_object(env, oth);
544 }
545
546 /**
547  * Allocate an osp request and initialize it with the given parameters.
548  *
549  * \param[in] obj               pointer to the operation target
550  * \param[in] data              pointer to the data used by the interpreter
551  * \param[in] interpreter       pointer to the interpreter function
552  *
553  * \retval                      pointer to the asychronous request
554  * \retval                      NULL if the allocation failed
555  */
556 static struct osp_update_callback *
557 osp_update_callback_init(struct osp_object *obj, void *data,
558                          osp_update_interpreter_t interpreter)
559 {
560         struct osp_update_callback *ouc;
561
562         OBD_ALLOC_PTR(ouc);
563         if (ouc == NULL)
564                 return NULL;
565
566         lu_object_get(osp2lu_obj(obj));
567         INIT_LIST_HEAD(&ouc->ouc_list);
568         ouc->ouc_obj = obj;
569         ouc->ouc_data = data;
570         ouc->ouc_interpreter = interpreter;
571
572         return ouc;
573 }
574
575 /**
576  * Destroy the osp_update_callback.
577  *
578  * \param[in] env       pointer to the thread context
579  * \param[in] ouc       pointer to osp_update_callback
580  */
581 static void osp_update_callback_fini(const struct lu_env *env,
582                                      struct osp_update_callback *ouc)
583 {
584         LASSERT(list_empty(&ouc->ouc_list));
585
586         lu_object_put(env, osp2lu_obj(ouc->ouc_obj));
587         OBD_FREE_PTR(ouc);
588 }
589
590 /**
591  * Interpret the packaged OUT RPC results.
592  *
593  * For every packaged sub-request, call its registered interpreter function.
594  * Then destroy the sub-request.
595  *
596  * \param[in] env       pointer to the thread context
597  * \param[in] req       pointer to the RPC
598  * \param[in] arg       pointer to data used by the interpreter
599  * \param[in] rc        the RPC return value
600  *
601  * \retval              0 for success
602  * \retval              negative error number on failure
603  */
604 static int osp_update_interpret(const struct lu_env *env,
605                                 struct ptlrpc_request *req, void *arg, int rc)
606 {
607         struct object_update_reply      *reply  = NULL;
608         struct osp_update_args          *oaua   = arg;
609         struct osp_update_request       *our = oaua->oaua_update;
610         struct osp_thandle              *oth;
611         struct osp_update_callback      *ouc;
612         struct osp_update_callback      *next;
613         int                              count  = 0;
614         int                              index  = 0;
615         int                              rc1    = 0;
616
617         ENTRY;
618
619         if (our == NULL)
620                 RETURN(0);
621
622         oaua->oaua_update = NULL;
623         oth = our->our_th;
624         if (oaua->oaua_flow_control) {
625                 struct osp_device *osp;
626
627                 LASSERT(oth != NULL);
628                 osp = dt2osp_dev(oth->ot_super.th_dev);
629                 obd_put_request_slot(&osp->opd_obd->u.cli);
630         }
631
632         /* Unpack the results from the reply message. */
633         if (req->rq_repmsg != NULL) {
634                 reply = req_capsule_server_sized_get(&req->rq_pill,
635                                                      &RMF_OUT_UPDATE_REPLY,
636                                                      OUT_UPDATE_REPLY_SIZE);
637                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) {
638                         if (rc == 0)
639                                 rc = -EPROTO;
640                 } else {
641                         count = reply->ourp_count;
642                 }
643         }
644
645         list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
646                 list_del_init(&ouc->ouc_list);
647
648                 /* The peer may only have handled some requests (indicated
649                  * by the 'count') in the packaged OUT RPC, we can only get
650                  * results for the handled part. */
651                 if (index < count && reply->ourp_lens[index] > 0 && rc >= 0) {
652                         struct object_update_result *result;
653
654                         result = object_update_result_get(reply, index, NULL);
655                         if (result == NULL)
656                                 rc1 = rc = -EPROTO;
657                         else
658                                 rc1 = rc = result->our_rc;
659                 } else if (rc1 >= 0) {
660                         /* The peer did not handle these request, let's return
661                          * -EINVAL to update interpret for now */
662                         if (rc >= 0)
663                                 rc1 = -EINVAL;
664                         else
665                                 rc1 = rc;
666                 }
667
668                 if (ouc->ouc_interpreter != NULL)
669                         ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj,
670                                              ouc->ouc_data, index, rc1);
671
672                 osp_update_callback_fini(env, ouc);
673                 index++;
674         }
675
676         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
677                 wake_up_all(oaua->oaua_waitq);
678
679         if (oth != NULL) {
680                 /* oth and osp_update_requests will be destoryed in
681                  * osp_thandle_put */
682                 osp_trans_stop_cb(env, oth, rc);
683                 osp_thandle_put(oth);
684         } else {
685                 osp_update_request_destroy(our);
686         }
687
688         RETURN(rc);
689 }
690
691 /**
692  * Pack all the requests in the shared asynchronous idempotent request queue
693  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
694  *
695  * \param[in] env       pointer to the thread context
696  * \param[in] osp       pointer to the OSP device
697  * \param[in] our       pointer to the shared queue
698  *
699  * \retval              0 for success
700  * \retval              negative error number on failure
701  */
702 int osp_unplug_async_request(const struct lu_env *env,
703                              struct osp_device *osp,
704                              struct osp_update_request *our)
705 {
706         struct osp_update_args  *args;
707         struct ptlrpc_request   *req = NULL;
708         int                      rc;
709
710         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
711                                  our, &req);
712         if (rc != 0) {
713                 struct osp_update_callback *ouc;
714                 struct osp_update_callback *next;
715
716                 list_for_each_entry_safe(ouc, next,
717                                          &our->our_cb_items, ouc_list) {
718                         list_del_init(&ouc->ouc_list);
719                         if (ouc->ouc_interpreter != NULL)
720                                 ouc->ouc_interpreter(env, NULL, NULL,
721                                                      ouc->ouc_obj,
722                                                      ouc->ouc_data, 0, rc);
723                         osp_update_callback_fini(env, ouc);
724                 }
725                 osp_update_request_destroy(our);
726         } else {
727                 args = ptlrpc_req_async_args(req);
728                 args->oaua_update = our;
729                 args->oaua_count = NULL;
730                 args->oaua_waitq = NULL;
731                 args->oaua_flow_control = false;
732                 req->rq_interpret_reply = osp_update_interpret;
733                 ptlrpcd_add_req(req);
734         }
735
736         return rc;
737 }
738
739 /**
740  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
741  * request queue - osp_device::opd_async_requests.
742  *
743  * If the osp_device::opd_async_requests is not NULL, then return it directly;
744  * otherwise create new osp_update_request and attach it to opd_async_requests.
745  *
746  * \param[in] osp       pointer to the OSP device
747  *
748  * \retval              pointer to the shared queue
749  * \retval              negative error number on failure
750  */
751 static struct osp_update_request *
752 osp_find_or_create_async_update_request(struct osp_device *osp)
753 {
754         struct osp_update_request *our = osp->opd_async_requests;
755
756         if (our != NULL)
757                 return our;
758
759         our = osp_update_request_create(&osp->opd_dt_dev);
760         if (IS_ERR(our))
761                 return our;
762
763         osp->opd_async_requests = our;
764
765         return our;
766 }
767
768 /**
769  * Insert an osp_update_callback into the osp_update_request.
770  *
771  * Insert an osp_update_callback to the osp_update_request. Usually each update
772  * in the osp_update_request will have one correspondent callback, and these
773  * callbacks will be called in rq_interpret_reply.
774  *
775  * \param[in] env               pointer to the thread context
776  * \param[in] obj               pointer to the operation target object
777  * \param[in] data              pointer to the data used by the interpreter
778  * \param[in] interpreter       pointer to the interpreter function
779  *
780  * \retval                      0 for success
781  * \retval                      negative error number on failure
782  */
783 int osp_insert_update_callback(const struct lu_env *env,
784                                struct osp_update_request *our,
785                                struct osp_object *obj, void *data,
786                                osp_update_interpreter_t interpreter)
787 {
788         struct osp_update_callback  *ouc;
789
790         ouc = osp_update_callback_init(obj, data, interpreter);
791         if (ouc == NULL)
792                 RETURN(-ENOMEM);
793
794         list_add_tail(&ouc->ouc_list, &our->our_cb_items);
795
796         return 0;
797 }
798
799 /**
800  * Insert an asynchronous idempotent request to the shared request queue that
801  * is attached to the osp_device.
802  *
803  * This function generates a new osp_async_request with the given parameters,
804  * then tries to insert the request into the osp_device-based shared request
805  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
806  * the shared queue firstly, and then re-tries.
807  *
808  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
809  *       osp_insert_async_request call from others.
810  *
811  * \param[in] env               pointer to the thread context
812  * \param[in] op                operation type, see 'enum update_type'
813  * \param[in] obj               pointer to the operation target
814  * \param[in] count             array size of the subsequent \a lens and \a bufs
815  * \param[in] lens              buffer length array for the subsequent \a bufs
816  * \param[in] bufs              the buffers to compose the request
817  * \param[in] data              pointer to the data used by the interpreter
818  * \param[in] repsize           how many bytes the caller allocated for \a data
819  * \param[in] interpreter       pointer to the interpreter function
820  *
821  * \retval                      0 for success
822  * \retval                      negative error number on failure
823  */
824 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
825                              struct osp_object *obj, int count,
826                              __u16 *lens, const void **bufs,
827                              void *data, __u32 repsize,
828                              osp_update_interpreter_t interpreter)
829 {
830         struct osp_device               *osp;
831         struct osp_update_request       *our;
832         struct object_update            *object_update;
833         size_t                          max_update_size;
834         struct object_update_request    *ureq;
835         struct osp_update_request_sub   *ours;
836         int                             rc = 0;
837         ENTRY;
838
839         osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
840         our = osp_find_or_create_async_update_request(osp);
841         if (IS_ERR(our))
842                 RETURN(PTR_ERR(our));
843
844 again:
845         ours = osp_current_object_update_request(our);
846
847         ureq = ours->ours_req;
848         max_update_size = ours->ours_req_size -
849                           object_update_request_size(ureq);
850
851         object_update = update_buffer_get_update(ureq, ureq->ourq_count);
852         rc = out_update_pack(env, object_update, &max_update_size, op,
853                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
854                              repsize);
855         /* The queue is full. */
856         if (rc == -E2BIG) {
857                 osp->opd_async_requests = NULL;
858                 mutex_unlock(&osp->opd_async_requests_mutex);
859
860                 rc = osp_unplug_async_request(env, osp, our);
861                 mutex_lock(&osp->opd_async_requests_mutex);
862                 if (rc != 0)
863                         RETURN(rc);
864
865                 our = osp_find_or_create_async_update_request(osp);
866                 if (IS_ERR(our))
867                         RETURN(PTR_ERR(our));
868
869                 goto again;
870         } else {
871                 if (rc < 0)
872                         RETURN(rc);
873
874                 ureq->ourq_count++;
875                 our->our_update_nr++;
876         }
877
878         rc = osp_insert_update_callback(env, our, obj, data, interpreter);
879
880         RETURN(rc);
881 }
882
883 int osp_trans_update_request_create(struct thandle *th)
884 {
885         struct osp_thandle              *oth = thandle_to_osp_thandle(th);
886         struct osp_update_request       *our;
887
888         if (oth->ot_our != NULL)
889                 return 0;
890
891         our = osp_update_request_create(th->th_dev);
892         if (IS_ERR(our)) {
893                 th->th_result = PTR_ERR(our);
894                 return PTR_ERR(our);
895         }
896
897         oth->ot_our = our;
898         our->our_th = oth;
899
900         return 0;
901 }
902
903 void osp_thandle_destroy(struct osp_thandle *oth)
904 {
905         LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
906         LASSERT(list_empty(&oth->ot_commit_dcb_list));
907         LASSERT(list_empty(&oth->ot_stop_dcb_list));
908         if (oth->ot_our != NULL)
909                 osp_update_request_destroy(oth->ot_our);
910         OBD_FREE_PTR(oth);
911 }
912
913 /**
914  * The OSP layer dt_device_operations::dt_trans_create() interface
915  * to create a transaction.
916  *
917  * There are two kinds of transactions that will involve OSP:
918  *
919  * 1) If the transaction only contains the updates on remote server
920  *    (MDT or OST), such as re-generating the lost OST-object for
921  *    LFSCK, then it is a remote transaction. For remote transaction,
922  *    the upper layer caller (such as the LFSCK engine) will call the
923  *    dt_trans_create() (with the OSP dt_device as the parameter),
924  *    then the call will be directed to the osp_trans_create() that
925  *    creates the transaction handler and returns it to the caller.
926  *
927  * 2) If the transcation contains both local and remote updates,
928  *    such as cross MDTs create under DNE mode, then the upper layer
929  *    caller will not trigger osp_trans_create(). Instead, it will
930  *    call dt_trans_create() on other dt_device, such as LOD that
931  *    will generate the transaction handler. Such handler will be
932  *    used by the whole transaction in subsequent sub-operations.
933  *
934  * \param[in] env       pointer to the thread context
935  * \param[in] d         pointer to the OSP dt_device
936  *
937  * \retval              pointer to the transaction handler
938  * \retval              negative error number on failure
939  */
940 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
941 {
942         struct osp_thandle              *oth;
943         struct thandle                  *th = NULL;
944         ENTRY;
945
946         OBD_ALLOC_PTR(oth);
947         if (unlikely(oth == NULL))
948                 RETURN(ERR_PTR(-ENOMEM));
949
950         oth->ot_magic = OSP_THANDLE_MAGIC;
951         th = &oth->ot_super;
952         th->th_dev = d;
953         th->th_tags = LCT_TX_HANDLE;
954
955         atomic_set(&oth->ot_refcount, 1);
956         INIT_LIST_HEAD(&oth->ot_commit_dcb_list);
957         INIT_LIST_HEAD(&oth->ot_stop_dcb_list);
958
959         RETURN(th);
960 }
961
962 /**
963  * Add commit callback to transaction.
964  *
965  * Add commit callback to the osp thandle, which will be called
966  * when the thandle is committed remotely.
967  *
968  * \param[in] th        the thandle
969  * \param[in] dcb       commit callback structure
970  *
971  * \retval              only return 0 for now.
972  */
973 int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
974 {
975         struct osp_thandle *oth = thandle_to_osp_thandle(th);
976
977         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
978         LASSERT(&dcb->dcb_func != NULL);
979         if (dcb->dcb_flags & DCB_TRANS_STOP)
980                 list_add(&dcb->dcb_linkage, &oth->ot_stop_dcb_list);
981         else
982                 list_add(&dcb->dcb_linkage, &oth->ot_commit_dcb_list);
983         return 0;
984 }
985
986 static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
987 {
988         struct dt_txn_commit_cb *dcb;
989         struct dt_txn_commit_cb *tmp;
990
991         LASSERT(atomic_read(&oth->ot_refcount) > 0);
992         /* call per-transaction callbacks if any */
993         list_for_each_entry_safe(dcb, tmp, &oth->ot_commit_dcb_list,
994                                  dcb_linkage) {
995                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
996                          "commit callback entry: magic=%x name='%s'\n",
997                          dcb->dcb_magic, dcb->dcb_name);
998                 list_del_init(&dcb->dcb_linkage);
999                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
1000         }
1001 }
1002
1003 static void osp_request_commit_cb(struct ptlrpc_request *req)
1004 {
1005         struct thandle          *th = req->rq_cb_data;
1006         struct osp_thandle      *oth;
1007         __u64                   last_committed_transno = 0;
1008         int                     result = req->rq_status;
1009         ENTRY;
1010
1011         if (th == NULL)
1012                 RETURN_EXIT;
1013
1014         oth = thandle_to_osp_thandle(th);
1015         if (req->rq_repmsg != NULL &&
1016             lustre_msg_get_last_committed(req->rq_repmsg))
1017                 last_committed_transno =
1018                         lustre_msg_get_last_committed(req->rq_repmsg);
1019
1020         if (last_committed_transno <
1021                 req->rq_import->imp_peer_committed_transno)
1022                 last_committed_transno =
1023                         req->rq_import->imp_peer_committed_transno;
1024
1025         CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n",
1026                req->rq_transno, last_committed_transno);
1027
1028         /* If the transaction is not really committed, mark result = 1 */
1029         if (req->rq_transno != 0 &&
1030             (req->rq_transno > last_committed_transno) && result == 0)
1031                 result = 1;
1032
1033         osp_trans_commit_cb(oth, result);
1034         req->rq_committed = 1;
1035         osp_thandle_put(oth);
1036         EXIT;
1037 }
1038
1039 /**
1040  * callback of osp transaction
1041  *
1042  * Call all of callbacks for this osp thandle. This will only be
1043  * called in error handler path. In the normal processing path,
1044  * these callback will be called in osp_request_commit_cb() and
1045  * osp_update_interpret().
1046  *
1047  * \param [in] env      execution environment
1048  * \param [in] oth      osp thandle
1049  * \param [in] rc       result of the osp thandle
1050  */
1051 void osp_trans_callback(const struct lu_env *env,
1052                         struct osp_thandle *oth, int rc)
1053 {
1054         struct osp_update_callback *ouc;
1055         struct osp_update_callback *next;
1056
1057         if (oth->ot_our != NULL) {
1058                 list_for_each_entry_safe(ouc, next,
1059                                          &oth->ot_our->our_cb_items, ouc_list) {
1060                         list_del_init(&ouc->ouc_list);
1061                         if (ouc->ouc_interpreter != NULL)
1062                                 ouc->ouc_interpreter(env, NULL, NULL,
1063                                                      ouc->ouc_obj,
1064                                                      ouc->ouc_data, 0, rc);
1065                         osp_update_callback_fini(env, ouc);
1066                 }
1067         }
1068         osp_trans_stop_cb(env, oth, rc);
1069         osp_trans_commit_cb(oth, rc);
1070 }
1071
1072 /**
1073  * Send the request for remote updates.
1074  *
1075  * Send updates to the remote MDT. Prepare the request by osp_update_req
1076  * and send them to remote MDT, for sync request, it will wait
1077  * until the reply return, otherwise hand it to ptlrpcd.
1078  *
1079  * Please refer to osp_trans_create() for transaction type.
1080  *
1081  * \param[in] env               pointer to the thread context
1082  * \param[in] osp               pointer to the OSP device
1083  * \param[in] our               pointer to the osp_update_request
1084  *
1085  * \retval                      0 for success
1086  * \retval                      negative error number on failure
1087  */
1088 static int osp_send_update_req(const struct lu_env *env,
1089                                struct osp_device *osp,
1090                                struct osp_update_request *our)
1091 {
1092         struct osp_update_args  *args;
1093         struct ptlrpc_request   *req;
1094         struct lu_device *top_device;
1095         struct osp_thandle      *oth = our->our_th;
1096         int     rc = 0;
1097         ENTRY;
1098
1099         LASSERT(oth != NULL);
1100         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
1101                                  our, &req);
1102         if (rc != 0) {
1103                 osp_trans_callback(env, oth, rc);
1104                 RETURN(rc);
1105         }
1106
1107         args = ptlrpc_req_async_args(req);
1108         args->oaua_update = our;
1109         osp_thandle_get(oth); /* hold for update interpret */
1110         req->rq_interpret_reply = osp_update_interpret;
1111         if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
1112                 if (!osp->opd_imp_active || !osp->opd_imp_connected) {
1113                         osp_trans_callback(env, oth, rc);
1114                         osp_thandle_put(oth);
1115                         GOTO(out, rc = -ENOTCONN);
1116                 }
1117
1118                 rc = obd_get_request_slot(&osp->opd_obd->u.cli);
1119                 if (rc != 0) {
1120                         osp_trans_callback(env, oth, rc);
1121                         osp_thandle_put(oth);
1122                         GOTO(out, rc = -ENOTCONN);
1123                 }
1124                 args->oaua_flow_control = true;
1125
1126                 if (!osp->opd_connect_mdt) {
1127                         down_read(&osp->opd_async_updates_rwsem);
1128                         args->oaua_count = &osp->opd_async_updates_count;
1129                         args->oaua_waitq = &osp->opd_syn_barrier_waitq;
1130                         up_read(&osp->opd_async_updates_rwsem);
1131                         atomic_inc(args->oaua_count);
1132                 }
1133
1134                 ptlrpcd_add_req(req);
1135                 req = NULL;
1136         } else {
1137                 osp_thandle_get(oth); /* hold for commit callback */
1138                 req->rq_commit_cb = osp_request_commit_cb;
1139                 req->rq_cb_data = &oth->ot_super;
1140                 args->oaua_flow_control = false;
1141
1142                 /* If the transaction is created during MDT recoverying
1143                  * process, it means this is an recovery update, we need
1144                  * to let OSP send it anyway without checking recoverying
1145                  * status, in case the other target is being recoveried
1146                  * at the same time, and if we wait here for the import
1147                  * to be recoveryed, it might cause deadlock */
1148                 top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
1149                 if (top_device->ld_obd->obd_recovering)
1150                         req->rq_allow_replay = 1;
1151
1152                 if (osp->opd_connect_mdt)
1153                         osp_get_rpc_lock(osp);
1154                 rc = ptlrpc_queue_wait(req);
1155                 if (osp->opd_connect_mdt)
1156                         osp_put_rpc_lock(osp);
1157                 if ((rc == -ENOMEM && req->rq_set == NULL) ||
1158                     (req->rq_transno == 0 && !req->rq_committed)) {
1159                         if (args->oaua_update != NULL) {
1160                                 /* If osp_update_interpret is not being called,
1161                                  * release the osp_thandle */
1162                                 args->oaua_update = NULL;
1163                                 osp_thandle_put(oth);
1164                         }
1165
1166                         req->rq_cb_data = NULL;
1167                         rc = rc == 0 ? req->rq_status : rc;
1168                         osp_trans_callback(env, oth, rc);
1169                         osp_thandle_put(oth);
1170                         GOTO(out, rc);
1171                 }
1172         }
1173 out:
1174         if (req != NULL)
1175                 ptlrpc_req_finished(req);
1176
1177         RETURN(rc);
1178 }
1179
1180 /**
1181  * Get local thandle for osp_thandle
1182  *
1183  * Get the local OSD thandle from the OSP thandle. Currently, there
1184  * are a few OSP API (osp_object_create() and osp_sync_add()) needs
1185  * to update the object on local OSD device.
1186  *
1187  * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
1188  * we will get local thandle by thandle_get_sub_by_dt.
1189  *
1190  * If the osp_thandle is remote thandle (th_top == NULL, only used
1191  * by LFSCK), then it will create a local thandle, and stop it in
1192  * osp_trans_stop(). And this only happens on OSP for OST.
1193  *
1194  * These are temporary solution, once OSP accessing OSD object is
1195  * being fixed properly, this function should be removed. XXX
1196  *
1197  * \param[in] env               pointer to the thread context
1198  * \param[in] th                pointer to the transaction handler
1199  * \param[in] dt                pointer to the OSP device
1200  *
1201  * \retval                      pointer to the local thandle
1202  * \retval                      ERR_PTR(errno) if it fails.
1203  **/
1204 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
1205                                         struct thandle *th,
1206                                         struct osp_device *osp)
1207 {
1208         struct osp_thandle      *oth;
1209         struct thandle          *local_th;
1210
1211         if (th->th_top != NULL)
1212                 return thandle_get_sub_by_dt(env, th->th_top,
1213                                              osp->opd_storage);
1214
1215         LASSERT(!osp->opd_connect_mdt);
1216         oth = thandle_to_osp_thandle(th);
1217         if (oth->ot_storage_th != NULL)
1218                 return oth->ot_storage_th;
1219
1220         local_th = dt_trans_create(env, osp->opd_storage);
1221         if (IS_ERR(local_th))
1222                 return local_th;
1223
1224         oth->ot_storage_th = local_th;
1225
1226         return local_th;
1227 }
1228
1229 /**
1230  * Set version for the transaction
1231  *
1232  * Set the version for the transaction and add the request to
1233  * the sending list, then after transaction stop, the request
1234  * will be picked in the order of version, by sending thread.
1235  *
1236  * \param [in] oth      osp thandle to be set version.
1237  *
1238  * \retval              0 if set version succeeds
1239  *                      negative errno if set version fails.
1240  */
1241 int osp_check_and_set_rpc_version(struct osp_thandle *oth,
1242                                   struct osp_object *obj)
1243 {
1244         struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
1245         struct osp_updates *ou = osp->opd_update;
1246
1247         if (ou == NULL)
1248                 return -EIO;
1249
1250         if (oth->ot_our->our_version != 0)
1251                 return 0;
1252
1253         spin_lock(&ou->ou_lock);
1254         spin_lock(&oth->ot_our->our_list_lock);
1255         if (obj->opo_stale) {
1256                 spin_unlock(&oth->ot_our->our_list_lock);
1257                 spin_unlock(&ou->ou_lock);
1258                 return -ESTALE;
1259         }
1260
1261         /* Assign the version and add it to the sending list */
1262         osp_thandle_get(oth);
1263         oth->ot_our->our_version = ou->ou_version++;
1264         list_add_tail(&oth->ot_our->our_list,
1265                       &osp->opd_update->ou_list);
1266         oth->ot_our->our_req_ready = 0;
1267         spin_unlock(&oth->ot_our->our_list_lock);
1268         spin_unlock(&ou->ou_lock);
1269
1270         LASSERT(oth->ot_super.th_wait_submit == 1);
1271         CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
1272                osp->opd_obd->obd_name, ou->ou_version, oth,
1273                oth->ot_our->our_version);
1274
1275         return 0;
1276 }
1277
1278 /**
1279  * Get next OSP update request in the sending list
1280  * Get next OSP update request in the sending list by version number, next
1281  * request will be
1282  * 1. transaction which does not have a version number.
1283  * 2. transaction whose version == opd_rpc_version.
1284  *
1285  * \param [in] ou       osp update structure.
1286  * \param [out] ourp    the pointer holding the next update request.
1287  *
1288  * \retval              true if getting the next transaction.
1289  * \retval              false if not getting the next transaction.
1290  */
1291 static bool
1292 osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
1293 {
1294         struct osp_update_request *our;
1295         struct osp_update_request *tmp;
1296         bool                    got_req = false;
1297
1298         spin_lock(&ou->ou_lock);
1299         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
1300                 LASSERT(our->our_th != NULL);
1301                 CDEBUG(D_HA, "ou %p version "LPU64" rpc_version "LPU64"\n",
1302                        ou, our->our_version, ou->ou_rpc_version);
1303                 spin_lock(&our->our_list_lock);
1304                 /* Find next osp_update_request in the list */
1305                 if (our->our_version == ou->ou_rpc_version &&
1306                     our->our_req_ready) {
1307                         list_del_init(&our->our_list);
1308                         spin_unlock(&our->our_list_lock);
1309                         *ourp = our;
1310                         got_req = true;
1311                         break;
1312                 }
1313                 spin_unlock(&our->our_list_lock);
1314         }
1315         spin_unlock(&ou->ou_lock);
1316
1317         return got_req;
1318 }
1319
1320 /**
1321  * Invalidate update request
1322  *
1323  * Invalidate update request in the OSP sending list, so all of
1324  * requests in the sending list will return error, which happens
1325  * when it finds one update (with writing llog) requests fails or
1326  * the OSP is evicted by remote target. see osp_send_update_thread().
1327  *
1328  * \param[in] osp       OSP device whose update requests will be
1329  *                      invalidated.
1330  **/
1331 void osp_invalidate_request(struct osp_device *osp)
1332 {
1333         struct lu_env env;
1334         struct osp_updates *ou = osp->opd_update;
1335         struct osp_update_request *our;
1336         struct osp_update_request *tmp;
1337         LIST_HEAD(list);
1338         int                     rc;
1339         ENTRY;
1340
1341         if (ou == NULL)
1342                 return;
1343
1344         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
1345         if (rc < 0) {
1346                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
1347                        rc);
1348                 return;
1349         }
1350
1351         INIT_LIST_HEAD(&list);
1352
1353         spin_lock(&ou->ou_lock);
1354         /* invalidate all of request in the sending list */
1355         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
1356                 spin_lock(&our->our_list_lock);
1357                 if (our->our_req_ready)
1358                         list_move(&our->our_list, &list);
1359                 else
1360                         list_del_init(&our->our_list);
1361
1362                 if (our->our_th->ot_super.th_result == 0)
1363                         our->our_th->ot_super.th_result = -EIO;
1364
1365                 if (our->our_version >= ou->ou_rpc_version)
1366                         ou->ou_rpc_version = our->our_version + 1;
1367                 spin_unlock(&our->our_list_lock);
1368
1369                 CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name,
1370                        our);
1371         }
1372
1373         spin_unlock(&ou->ou_lock);
1374
1375         /* invalidate all of request in the sending list */
1376         list_for_each_entry_safe(our, tmp, &list, our_list) {
1377                 spin_lock(&our->our_list_lock);
1378                 list_del_init(&our->our_list);
1379                 spin_unlock(&our->our_list_lock);
1380                 osp_trans_callback(&env, our->our_th,
1381                                    our->our_th->ot_super.th_result);
1382                 osp_thandle_put(our->our_th);
1383         }
1384         lu_env_fini(&env);
1385 }
1386
1387 /**
1388  * Sending update thread
1389  *
1390  * Create thread to send update request to other MDTs, this thread will pull
1391  * out update request from the list in OSP by version number, i.e. it will
1392  * make sure the update request with lower version number will be sent first.
1393  *
1394  * \param[in] arg       hold the OSP device.
1395  *
1396  * \retval              0 if the thread is created successfully.
1397  * \retal               negative error if the thread is not created
1398  *                      successfully.
1399  */
1400 int osp_send_update_thread(void *arg)
1401 {
1402         struct lu_env           env;
1403         struct osp_device       *osp = arg;
1404         struct l_wait_info       lwi = { 0 };
1405         struct osp_updates      *ou = osp->opd_update;
1406         struct ptlrpc_thread    *thread = &osp->opd_update_thread;
1407         struct osp_update_request *our = NULL;
1408         int                     rc;
1409         ENTRY;
1410
1411         LASSERT(ou != NULL);
1412         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
1413         if (rc < 0) {
1414                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
1415                        rc);
1416                 RETURN(rc);
1417         }
1418
1419         thread->t_flags = SVC_RUNNING;
1420         wake_up(&thread->t_ctl_waitq);
1421         while (1) {
1422                 our = NULL;
1423                 l_wait_event(ou->ou_waitq,
1424                              !osp_send_update_thread_running(osp) ||
1425                              osp_get_next_request(ou, &our), &lwi);
1426
1427                 if (!osp_send_update_thread_running(osp)) {
1428                         if (our != NULL) {
1429                                 osp_trans_callback(&env, our->our_th, -EINTR);
1430                                 osp_thandle_put(our->our_th);
1431                         }
1432                         break;
1433                 }
1434
1435                 LASSERT(our->our_th != NULL);
1436                 if (our->our_th->ot_super.th_result != 0) {
1437                         osp_trans_callback(&env, our->our_th,
1438                                 our->our_th->ot_super.th_result);
1439                         rc = our->our_th->ot_super.th_result;
1440                 } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
1441                         rc = -EIO;
1442                         osp_trans_callback(&env, our->our_th, rc);
1443                 } else {
1444                         rc = osp_send_update_req(&env, osp, our);
1445                 }
1446
1447                 /* Update the rpc version */
1448                 spin_lock(&ou->ou_lock);
1449                 if (our->our_version == ou->ou_rpc_version)
1450                         ou->ou_rpc_version++;
1451                 spin_unlock(&ou->ou_lock);
1452
1453                 /* If one update request fails, let's fail all of the requests
1454                  * in the sending list, because the request in the sending
1455                  * list are dependent on either other, continue sending these
1456                  * request might cause llog or filesystem corruption */
1457                 if (rc < 0)
1458                         osp_invalidate_request(osp);
1459
1460                 /* Balanced for thandle_get in osp_check_and_set_rpc_version */
1461                 osp_thandle_put(our->our_th);
1462         }
1463
1464         thread->t_flags = SVC_STOPPED;
1465         lu_env_fini(&env);
1466         wake_up(&thread->t_ctl_waitq);
1467
1468         RETURN(0);
1469 }
1470
1471 /**
1472  * The OSP layer dt_device_operations::dt_trans_start() interface
1473  * to start the transaction.
1474  *
1475  * If the transaction is a remote transaction, then related remote
1476  * updates will be triggered in the osp_trans_stop().
1477  * Please refer to osp_trans_create() for transaction type.
1478  *
1479  * \param[in] env               pointer to the thread context
1480  * \param[in] dt                pointer to the OSP dt_device
1481  * \param[in] th                pointer to the transaction handler
1482  *
1483  * \retval                      0 for success
1484  * \retval                      negative error number on failure
1485  */
1486 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
1487                     struct thandle *th)
1488 {
1489         struct osp_thandle      *oth = thandle_to_osp_thandle(th);
1490
1491         if (oth->ot_super.th_sync)
1492                 oth->ot_our->our_flags |= UPDATE_FL_SYNC;
1493         /* For remote thandle, if there are local thandle, start it here*/
1494         if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
1495                 return dt_trans_start(env, oth->ot_storage_th->th_dev,
1496                                       oth->ot_storage_th);
1497         return 0;
1498 }
1499
1500 /**
1501  * The OSP layer dt_device_operations::dt_trans_stop() interface
1502  * to stop the transaction.
1503  *
1504  * If the transaction is a remote transaction, related remote
1505  * updates will be triggered at the end of this function.
1506  *
1507  * For synchronous mode update or any failed update, the request
1508  * will be destroyed explicitly when the osp_trans_stop().
1509  *
1510  * Please refer to osp_trans_create() for transaction type.
1511  *
1512  * \param[in] env               pointer to the thread context
1513  * \param[in] dt                pointer to the OSP dt_device
1514  * \param[in] th                pointer to the transaction handler
1515  *
1516  * \retval                      0 for success
1517  * \retval                      negative error number on failure
1518  */
1519 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
1520                    struct thandle *th)
1521 {
1522         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
1523         struct osp_update_request *our = oth->ot_our;
1524         struct osp_device        *osp = dt2osp_dev(dt);
1525         int                      rc = 0;
1526         ENTRY;
1527
1528         /* For remote transaction, if there is local storage thandle,
1529          * stop it first */
1530         if (oth->ot_storage_th != NULL && th->th_top == NULL) {
1531                 dt_trans_stop(env, oth->ot_storage_th->th_dev,
1532                               oth->ot_storage_th);
1533                 oth->ot_storage_th = NULL;
1534         }
1535
1536         if (our == NULL || list_empty(&our->our_req_list)) {
1537                 osp_trans_callback(env, oth, th->th_result);
1538                 GOTO(out, rc = th->th_result);
1539         }
1540
1541         if (!osp->opd_connect_mdt) {
1542                 osp_trans_callback(env, oth, th->th_result);
1543                 rc = osp_send_update_req(env, osp, oth->ot_our);
1544                 GOTO(out, rc);
1545         }
1546
1547         if (osp->opd_update == NULL ||
1548             !osp_send_update_thread_running(osp)) {
1549                 osp_trans_callback(env, oth, -EIO);
1550                 GOTO(out, rc = -EIO);
1551         }
1552
1553         CDEBUG(D_HA, "%s: add oth %p with version "LPU64"\n",
1554                osp->opd_obd->obd_name, oth, our->our_version);
1555
1556         LASSERT(our->our_req_ready == 0);
1557         spin_lock(&our->our_list_lock);
1558         if (likely(!list_empty(&our->our_list))) {
1559                 /* notify sending thread */
1560                 our->our_req_ready = 1;
1561                 wake_up(&osp->opd_update->ou_waitq);
1562                 spin_unlock(&our->our_list_lock);
1563         } else if (th->th_result == 0) {
1564                 /* if the request does not needs to be serialized,
1565                  * read-only request etc, let's send it right away */
1566                 spin_unlock(&our->our_list_lock);
1567                 rc = osp_send_update_req(env, osp, our);
1568         } else {
1569                 spin_unlock(&our->our_list_lock);
1570                 osp_trans_callback(env, oth, th->th_result);
1571         }
1572 out:
1573         osp_thandle_put(oth);
1574
1575         RETURN(rc);
1576 }