4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
26 * lustre/osp/osp_trans.c
28 * Author: Di Wang <di.wang@intel.com>
29 * Author: Fan, Yong <fan.yong@intel.com>
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include "osp_internal.h"
36 struct osp_async_update_args {
37 struct dt_update_request *oaua_update;
38 bool oaua_flow_control;
41 struct osp_async_update_item {
42 struct list_head oaui_list;
43 struct osp_object *oaui_obj;
45 osp_async_update_interpterer_t oaui_interpterer;
48 static struct osp_async_update_item *
49 osp_async_update_item_init(struct osp_object *obj, void *data,
50 osp_async_update_interpterer_t interpterer)
52 struct osp_async_update_item *oaui;
58 lu_object_get(osp2lu_obj(obj));
59 INIT_LIST_HEAD(&oaui->oaui_list);
61 oaui->oaui_data = data;
62 oaui->oaui_interpterer = interpterer;
67 static void osp_async_update_item_fini(const struct lu_env *env,
68 struct osp_async_update_item *oaui)
70 LASSERT(list_empty(&oaui->oaui_list));
72 lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
76 static int osp_async_update_interpret(const struct lu_env *env,
77 struct ptlrpc_request *req,
80 struct object_update_reply *reply = NULL;
81 struct osp_async_update_args *oaua = arg;
82 struct dt_update_request *dt_update = oaua->oaua_update;
83 struct osp_async_update_item *oaui;
84 struct osp_async_update_item *next;
89 if (oaua->oaua_flow_control)
91 &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
93 if (rc == 0 || req->rq_repmsg != NULL) {
94 reply = req_capsule_server_sized_get(&req->rq_pill,
95 &RMF_OUT_UPDATE_REPLY,
96 OUT_UPDATE_REPLY_SIZE);
97 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
100 count = reply->ourp_count;
105 list_for_each_entry_safe(oaui, next, &dt_update->dur_cb_items,
107 list_del_init(&oaui->oaui_list);
108 if (index < count && reply->ourp_lens[index] > 0) {
109 struct object_update_result *result;
111 result = object_update_result_get(reply, index, NULL);
115 rc1 = result->our_rc;
118 if (unlikely(rc1 == 0))
122 oaui->oaui_interpterer(env, reply, req, oaui->oaui_obj,
123 oaui->oaui_data, index, rc1);
124 osp_async_update_item_fini(env, oaui);
128 out_destroy_update_req(dt_update);
133 int osp_unplug_async_update(const struct lu_env *env,
134 struct osp_device *osp,
135 struct dt_update_request *update)
137 struct osp_async_update_args *args;
138 struct ptlrpc_request *req = NULL;
141 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
142 update->dur_req, &req);
144 struct osp_async_update_item *oaui;
145 struct osp_async_update_item *next;
147 list_for_each_entry_safe(oaui, next,
148 &update->dur_cb_items, oaui_list) {
149 list_del_init(&oaui->oaui_list);
150 oaui->oaui_interpterer(env, NULL, NULL, oaui->oaui_obj,
151 oaui->oaui_data, 0, rc);
152 osp_async_update_item_fini(env, oaui);
154 out_destroy_update_req(update);
156 LASSERT(list_empty(&update->dur_list));
158 args = ptlrpc_req_async_args(req);
159 args->oaua_update = update;
160 req->rq_interpret_reply = osp_async_update_interpret;
161 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
167 /* with osp::opd_async_requests_mutex held */
168 struct dt_update_request *
169 osp_find_or_create_async_update_request(struct osp_device *osp)
171 struct dt_update_request *update = osp->opd_async_requests;
176 update = out_create_update_req(&osp->opd_dt_dev);
178 osp->opd_async_requests = update;
183 /* with osp::opd_async_requests_mutex held */
184 int osp_insert_async_update(const struct lu_env *env,
185 struct dt_update_request *update, int op,
186 struct osp_object *obj, int count,
187 int *lens, const char **bufs, void *data,
188 osp_async_update_interpterer_t interpterer)
190 struct osp_async_update_item *oaui;
191 struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
195 oaui = osp_async_update_item_init(obj, data, interpterer);
200 rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
203 osp->opd_async_requests = NULL;
204 mutex_unlock(&osp->opd_async_requests_mutex);
206 rc = osp_unplug_async_update(env, osp, update);
207 mutex_lock(&osp->opd_async_requests_mutex);
211 update = osp_find_or_create_async_update_request(osp);
213 GOTO(out, rc = PTR_ERR(update));
219 list_add_tail(&oaui->oaui_list, &update->dur_cb_items);
225 osp_async_update_item_fini(env, oaui);
231 * If the transaction creation goes to OSP, it means the update
232 * in this transaction only includes remote UPDATE. It is only
233 * used by LFSCK right now.
235 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
237 struct thandle *th = NULL;
238 struct thandle_update *tu = NULL;
242 if (unlikely(th == NULL))
243 GOTO(out, rc = -ENOMEM);
246 th->th_tags = LCT_TX_HANDLE;
247 atomic_set(&th->th_refc, 1);
248 th->th_alloc_size = sizeof(*th);
252 GOTO(out, rc = -ENOMEM);
254 INIT_LIST_HEAD(&tu->tu_remote_update_list);
255 tu->tu_only_remote_trans = 1;
270 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
271 struct dt_update_request *dt_update,
272 struct thandle *th, bool flow_control)
274 struct thandle_update *tu = th->th_update;
279 /* If the transaction only includes remote update, it should
280 * still be asynchronous */
281 if (is_only_remote_trans(th)) {
282 struct osp_async_update_args *args;
283 struct ptlrpc_request *req;
285 list_del_init(&dt_update->dur_list);
286 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
287 dt_update->dur_req, &req);
289 args = ptlrpc_req_async_args(req);
290 args->oaua_update = dt_update;
291 args->oaua_flow_control = flow_control;
292 req->rq_interpret_reply =
293 osp_async_update_interpret;
294 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
296 out_destroy_update_req(dt_update);
299 /* Before we support async update, the cross MDT transaction
300 * has to been synchronized */
302 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
309 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
312 struct thandle_update *tu = th->th_update;
313 struct dt_update_request *dt_update;
319 /* Check whether there are updates related with this OSP */
320 dt_update = out_find_update(tu, dt);
321 if (dt_update == NULL)
324 /* Note: some updates needs to send before local transaction,
325 * some needs to send after local transaction.
327 * If the transaction only includes remote updates, it will
328 * send updates to remote MDT in osp_trans_stop.
330 * If it is remote create, it will send the remote req after
331 * local transaction. i.e. create the object locally first,
332 * then insert the name entry.
334 * If it is remote unlink, it will send the remote req before
335 * the local transaction, i.e. delete the name entry remote
336 * first, then destroy the local object. */
337 if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
338 rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
344 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
347 struct thandle_update *tu = th->th_update;
348 struct dt_update_request *dt_update;
352 LASSERT(tu != LP_POISON);
353 /* Check whether there are updates related with this OSP */
354 dt_update = out_find_update(tu, dt);
355 if (dt_update == NULL) {
356 if (!is_only_remote_trans(th))
361 if (dt_update->dur_req->ourq_count == 0) {
362 out_destroy_update_req(dt_update);
366 if (is_only_remote_trans(th)) {
367 if (th->th_result == 0) {
368 struct osp_device *osp = dt2osp_dev(th->th_dev);
369 struct client_obd *cli = &osp->opd_obd->u.cli;
371 rc = obd_get_request_slot(cli);
372 if (!osp->opd_imp_active || osp->opd_got_disconnected) {
374 obd_put_request_slot(cli);
380 out_destroy_update_req(dt_update);
384 rc = osp_trans_trigger(env, dt2osp_dev(dt),
385 dt_update, th, true);
387 obd_put_request_slot(cli);
390 out_destroy_update_req(dt_update);
393 if (tu->tu_sent_after_local_trans)
394 rc = osp_trans_trigger(env, dt2osp_dev(dt),
395 dt_update, th, false);
396 rc = dt_update->dur_rc;
397 out_destroy_update_req(dt_update);