Codelab: SQL↔KV hacking
- Jordan Lewis
This Codelab touches on the sql layer Insert node and how it communicates its writes to the KV layer. It also touches on lower-level (storage-level) concepts such as transactions and intents. As an excuse to touch interesting code points, we’ll hack together a nifty feature: a circumvention of a current CRDB limitation pertaining to large transactions.
Original Author |
---|
The content of this macro can only be viewed by users who have logged in. |
This codelab is out of date - the feature has been implemented, and it's out of date. Nevertheless, it might be useful to see the process of editing this code, so it persists.
It is the case that there’s currently a limit on the number rows that a SQL transaction can write before becoming “too large to commit”. The reasons for this limitation will be explained. It is surprisingly easy for an unsuspecting SQL user to find herself in this situation: statements like INSERT INTO <table> FROM SELECT … and CREATE TABLE <table> AS SELECT … can easily exceed the current limit of 100k rows. Even if no transaction has been explicitly used by the client, CRDB internally runs all SQL statements in implicit transactions. This codelab will modify the execution of CREATE TABLE … AS … such that it doesn’t run into this limitation any more. The point of the exercise is to familiarize the reader with parts of the CRDB codebase, not on the actual design of the solution.
user@bash: make build
user@bash: ./cockroach start --insecure --logtostderr
user@bash: ./cockroach sql --insecure
# Welcome to the cockroach SQL interface.
# All statements must be terminated by a semicolon.
# To exit: CTRL + D.
root@:26257/> create database codelab;
CREATE DATABASE
root@:26257/> CREATE TABLE codelab.series AS SELECT generate_series FROM generate_series(1, 100100);
pq: kv/txn_coord_sender.go:423: transaction is too large to commit: 100108 intents
Intermezzo 1: Transactions and Intents
Intermezzo 2: Cockroach KV requests
Intermezzo 3: Terminology
- Request: a request refers to a KV request, namely a BatchRequest (even single requests are wrapped in batches before being sent to KV). These are protobuf messages that will be routed to the data range on which they’re supposed to operate and eventually return a response. They’re split into read and write requests; besides the types of results these categories produce, an important difference is that only writes need to go through Raft.
- Evaluation: when a request arrives to its destination range, it is evaluated. For reads, this evaluation means actually reading data from the storage layer and packaging it into a response. For write, evaluation means figuring out exactly what low-level changes are to be made to the storage-level data (e.g. a DeleteRange request will cause a bunch of tombstone records to be written to storage) and serializing all these changes into a command. Evaluation happens on the current lease holder replica of the range in question.
- Command: a command is the result of evaluating a write request. It contains a binary representation of changes to be made to storage (RocksDB). It is proposed to Raft and, if accepted, it will subsequently be applied on all the replicas of the range.
- Proposing: a command is proposed to the Raft group formed by all the replicas of a range. If a quorum of the replicas accept it (according to the Raft consensus protocol rules), the command is written to the Raft log - at this point the command has occupied one position in the log. This log is constantly applied by all the members of the Raft group (i.e. all the range’s replicas).
- Application: Commands are taken off from the log by the Raft machinery and applied to the local RocksDB instance. This involves deserializing the changes marshalled inside the command and… applying them to the storage engine. After application, the RocksDB instance on the specific replica has been changed according to the initial write request that triggered the whole saga.
⌁ [andrei:~/work2/src/github.com/cockroachdb/cockroach] codelab+ 130 ± grep -r "transaction is too large to commit" .
./pkg/kv/txn_coord_sender.go: return roachpb.NewErrorf("transaction is too large to commit: %d intents", len(et.IntentSpans))
Intermezzo 4: SQL execution
SELECT * FROM customers WHERE State LIKE 'C%' AND strpos(address, 'Infinite') != 0 ORDER BY Name;
root@:26257> EXPLAIN(EXPRS) SELECT * FROM customers WHERE State LIKE 'C%' and strpos(address, 'Infinite') != 0 order by name;
+-------+------------+--------+----------------------------------+
| Level | Type | Field | Description |
+-------+------------+--------+----------------------------------+
| 0 | sort | | |
| 0 | | order | +name |
| 1 | index-join | | |
| 2 | scan | | |
| 2 | | table | customers@SI |
| 2 | | spans | /"C"-/"D" |
| 2 | | filter | state LIKE 'C%' |
| 2 | scan | | |
| 2 | | table | customers@primary |
| 2 | | filter | strpos(address, 'Infinite') != 0 |
+-------+------------+--------+----------------------------------+
sortNode -> indexJoinNode -> scanNode (index)
-> scanNode (PK)
if next, err := n.run.rows.Next(ctx); !next {
...
}
_, err = n.tw.row(ctx, rowVals)
The patch
- We’ll add a flag to the BatchRequest informing the TxnCoordSender that it shouldn’t keep track of intents pertaining to write requests in this batch. This flag will be set on the batch produced by the insertNode that’s created by a createTableNode. Setting the flag implies that the client takes responsibility for informing the TxnCoordSender in some other way about the intents. It is imperative that the TxnCoordSender is informed about the intents before the EndTransaction request committing the transaction is sent - otherwise the data will be silently rolled back.
- We’ll create a new type of KV requests - RecordIntentsRequest - which a client can send to inform the TxnCoordSender of a range of intents that needs to be scanned on EndTransaction. This request is quite hacky; it’s unlike any other request in that it is terminated by the TxnCoordSender and not actually sent further (i.e. to a data range). It is pretty unfortunate that we need to create a new type of dummy request for that as opposed to more directly talking to the TxnCoordSender. The problem is that the TxnCoordSender is hidden behind the narrow client.Sender interface that we’ve described. This is generally a problem that will get addressed soon (issue #10511).
- The tableInserter will send such a request before committing the transaction. As a result, the TxnCoordSender will attach the new table’s whole key space to the EndTransaction request.
@@ -1018,6 +1024,15 @@ message Header {
// gateway_node_id is the ID of the gateway node where the request originated.
optional int32 gateway_node_id = 11 [(gogoproto.nullable) = false,
(gogoproto.customname) = "GatewayNodeID", (gogoproto.casttype) = "NodeID"];
+ // If set, the TxnCoordSender will not do its regular job of keeping track of
+ // intents laid down by write requests in this batch. This can be used when
+ // the client wants to take upon itself the job of tracking intents -
+ // presumably because it knows how to group them into a few key spans.
+ //
+ // This should only be used for transactional requests. The client should
+ // either fill in the IntentSpans on the EndTransaction or should send a
+ // RecordIntentsRequest.
+ optional bool txn_coord_sender_ignore_intents = 12 [(gogoproto.nullable) = false];
}
user@bash: make protobuf
+// InsertType differentiates between different behaviours of an Insert planNode.
+type InsertType bool
+
+const (
+ // RegularInsert activates the default behaviour.
+ RegularInsert InsertType = false
+ // NoIntentTracking will cause the Insert node to ask the TxnCoordSender to
+ // not track the intents produced by its write batches. Instead, the node will
+ // manually inform the TxnCoordSender of a single intent range, equal to the
+ // whole key span of the table's PK. This is useful for going around
+ // limitations on the maximum number of intents that a txn can accumulate and
+ // works well when the table did not have any data previous to the insert.
+ // This is used for CREATE TABLE AS SELECT... statements.
+ //
+ // ATTENTION: Do not use this on tables with secondary indexes; intents on
+ // those will not be cleaned up properly, resulting on writes to those indexes
+ // being dropped.
+ NoIntentTracking InsertType = true
+)
+
@@ -91,6 +92,8 @@ type tableInserter struct {
// Set by init.
txn *client.Txn
b *client.Batch
+
+ insertType InsertType
}
func (ti *tableInserter) walkExprs(_ func(desc string, index int, expr parser.TypedExpr)) {}
@@ -98,6 +101,9 @@ func (ti *tableInserter) walkExprs(_ func(desc string, index int, expr parser.Ty
func (ti *tableInserter) init(txn *client.Txn) error {
ti.txn = txn
ti.b = txn.NewBatch()
+ if ti.insertType == NoIntentTracking {
+ ti.b.Header.TxnCoordSenderIgnoreIntents = true
+ }
return nil
}
// Insert inserts rows into the database.
// Privileges: INSERT on table. Also requires UPDATE on "ON DUPLICATE KEY UPDATE".
// Notes: postgres requires INSERT. No "on duplicate key update" option.
// mysql requires INSERT. Also requires UPDATE on "ON DUPLICATE KEY UPDATE".
func (p *planner) Insert(
- ctx context.Context, n *parser.Insert, desiredTypes []parser.Type,
+ ctx context.Context, n *parser.Insert, desiredTypes []parser.Type, insertType InsertType,
) (planNode, error) {
tn, err := p.getAliasedTableName(n.Table)
if err != nil {
@@ -139,7 +159,7 @@ func (p *planner) Insert(
var tw tableWriter
if n.OnConflict == nil {
- tw = &tableInserter{ri: ri, autoCommit: p.autoCommit}
+ tw = &tableInserter{ri: ri, autoCommit: p.autoCommit, insertType: insertType}
} else {
updateExprs, conflictIndex, err := upsertExprsAndIndex(en.tableDesc, *n.OnConflict, ri.InsertCols)
if err != nil {
@@ -323,7 +323,7 @@ func (p *planner) newPlan(
case *parser.Help:
return p.Help(ctx, n)
case *parser.Insert:
- return p.Insert(ctx, n, desiredTypes)
+ return p.Insert(ctx, n, desiredTypes, RegularInsert)
case *parser.ParenSelect:
return p.newPlan(ctx, n.Select, desiredTypes)
case *parser.Relocate:
@@ -410,7 +410,7 @@ func (p *planner) prepare(ctx context.Context, stmt parser.Statement) (planNode,
case *parser.Help:
return p.Help(ctx, n)
case *parser.Insert:
- return p.Insert(ctx, n, nil)
+ return p.Insert(ctx, n, nil, RegularInsert)
case *parser.Select:
return p.Select(ctx, n, nil)
case *parser.SelectClause:
@@ -727,7 +727,8 @@ func (n *createTableNode) Start(ctx context.Context) error {
Rows: n.n.AsSource,
Returning: parser.AbsentReturningClause,
}
- insertPlan, err := n.p.Insert(ctx, insert, nil /* desiredTypes */)
+ // We'll use the NoIntentTracking optimization since this is a new table.
+ insertPlan, err := n.p.Insert(ctx, insert, nil /* desiredTypes */, NoIntentTracking)
if err != nil {
return err
}
@@ -392,23 +409,25 @@ func (tc *TxnCoordSender) Send(
txnMeta := tc.txnMu.txns[txnID]
distinctSpans := true
if txnMeta != nil {
et.IntentSpans = txnMeta.keys
// Defensively set distinctSpans to false if we had any previous
// requests in this transaction. This effectively limits the distinct
// spans optimization to 1pc transactions.
distinctSpans = len(txnMeta.keys) == 0
}
- // We can't pass in a batch response here to better limit the key
- // spans as we don't know what is going to be affected. This will
- // affect queries such as `DELETE FROM my.table LIMIT 10` when
- // executed as a 1PC transaction. e.g.: a (BeginTransaction,
- // DeleteRange, EndTransaction) batch.
- ba.IntentSpanIterate(nil, func(key, endKey roachpb.Key) {
- et.IntentSpans = append(et.IntentSpans, roachpb.Span{
- Key: key,
- EndKey: endKey,
+ if !ba.TxnCoordSenderIgnoreIntents {
+ // We can't pass in a batch response here to better limit the key
+ // spans as we don't know what is going to be affected. This will
+ // affect queries such as `DELETE FROM my.table LIMIT 10` when
+ // executed as a 1PC transaction. e.g.: a (BeginTransaction,
+ // DeleteRange, EndTransaction) batch.
+ ba.IntentSpanIterate(nil, func(key, endKey roachpb.Key) {
+ et.IntentSpans = append(et.IntentSpans, roachpb.Span{
+ Key: key,
+ EndKey: endKey,
+ })
})
- })
+ }
@@ -942,12 +961,14 @@ func (tc *TxnCoordSender) updateState(
if txnMeta != nil {
keys = txnMeta.keys
}
- ba.IntentSpanIterate(br, func(key, endKey roachpb.Key) {
- keys = append(keys, roachpb.Span{
- Key: key,
- EndKey: endKey,
+ if !ba.TxnCoordSenderIgnoreIntents {
+ ba.IntentSpanIterate(br, func(key, endKey roachpb.Key) {
+ keys = append(keys, roachpb.Span{
+ Key: key,
+ EndKey: endKey,
+ })
})
- })
+ }
@@ -959,7 +980,7 @@ func (tc *TxnCoordSender) updateState(
if txnMeta != nil {
txnMeta.keys = keys
- } else if len(keys) > 0 {
+ } else if len(keys) > 0 || ba.TxnCoordSenderIgnoreIntents {
// If the transaction is already over, there's no point in
// launching a one-off coordinator which will shut down right
// away. If we ended up here with an error, we'll always start
@@ -997,7 +1018,6 @@ func (tc *TxnCoordSender) updateState(
}
}
}
RecordIntentsRequest
@@ -283,6 +283,11 @@ message EndTransactionRequest {
optional bool require_1pc = 6 [(gogoproto.nullable) = false, (gogoproto.customname) = "Require1PC"];
}
+message RecordIntentsRequest {
+ optional Span header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
+ repeated Span intent_spans = 2 [(gogoproto.nullable) = false];
+}
+
@@ -908,6 +913,7 @@ message RequestUnion {
optional ImportRequest import = 34;
optional QueryTxnRequest query_txn = 33;
optional AdminScatterRequest admin_scatter = 36;
+ optional RecordIntentsRequest record_intents = 37;
}
@@ -232,6 +232,9 @@ func (b *Batch) fillResults() error {
result.Keys = reply.(*roachpb.DeleteRangeResponse).Keys
}
+ case *roachpb.RecordIntentsRequest:
+ // nothing to do. This request has an empty result.
+
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
@@ -429,6 +429,9 @@ func (*BeginTransactionRequest) Method() Method { return BeginTransaction }
func (*EndTransactionRequest) Method() Method { return EndTransaction }
// Method implements the Request interface.
+func (*RecordIntentsRequest) Method() Method { return RecordIntents }
+
+// Method implements the Request interface.
func (*AdminSplitRequest) Method() Method { return AdminSplit }
// Method implements the Request interface.
@@ -570,6 +573,12 @@ func (etr *EndTransactionRequest) ShallowCopy() Request {
}
// ShallowCopy implements the Request interface.
+func (rir *RecordIntentsRequest) ShallowCopy() Request {
+ shallowCopy := *rir
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Request interface.
func (asr *AdminSplitRequest) ShallowCopy() Request {
shallowCopy := *asr
return &shallowCopy
@@ -883,6 +892,7 @@ func (*BeginTransactionRequest) flags() int { return isWrite | isTxn | consultsT
// replays. Replays for the same transaction key and timestamp will
// have Txn.WriteTooOld=true and must retry on EndTransaction.
func (*EndTransactionRequest) flags() int { return isWrite | isTxn | isAlone | updatesTSCache }
+func (*RecordIntentsRequest) flags() int { return isWrite | isTxn | isAlone }
func (*AdminSplitRequest) flags() int { return isAdmin | isAlone }
func (*AdminMergeRequest) flags() int { return isAdmin | isAlone }
func (*AdminTransferLeaseRequest) flags() int { return isAdmin | isAlone }
@@ -59,6 +59,7 @@ const (
BeginTransaction
// EndTransaction either commits or aborts an ongoing transaction.
EndTransaction
+ RecordIntents
// AdminSplit is called to coordinate a split of a range.
AdminSplit
// AdminMerge is called to coordinate a merge of two adjacent ranges.
@@ -325,6 +325,29 @@ func (tc *TxnCoordSender) Send(
) (*roachpb.BatchResponse, *roachpb.Error) {
ctx = tc.AnnotateCtx(ctx)
+ // Handle the RecordIntents request. That's a special request not meant to be
+ // passed further than the TxnCoordSender.
+ if len(ba.Requests) == 1 &&
+ ba.Requests[0].GetInner().Method() == roachpb.RecordIntents {
+ union := roachpb.ResponseUnion{}
+ union.MustSetInner(&roachpb.NoopResponse{})
+ br := &roachpb.BatchResponse{
+ Responses: []roachpb.ResponseUnion{union},
+ }
+
+ txnMeta, ok := tc.txnMu.txns[*ba.Txn.ID]
+ if !ok {
+ // This transaction didn't previously perform any writes, so it's
+ // pointless to record any intents for it.
+ return br, nil
+ }
+ tc.txnMu.Lock()
+ ri := ba.Requests[0].GetInner().(*roachpb.RecordIntentsRequest)
+ txnMeta.keys = append(txnMeta.keys, ri.IntentSpans...)
+ tc.txnMu.Unlock()
+ return br, nil
+ }
+
@@ -107,13 +113,38 @@ func (ti *tableInserter) row(ctx context.Context, values parser.Datums) (parser.
func (ti *tableInserter) finalize(ctx context.Context) error {
var err error
+ desc := ti.ri.Helper.TableDesc
+ tableStartKey := roachpb.Key(keys.MakeTablePrefix(uint32(desc.ID)))
+ tableEndKey := roachpb.Key(keys.MakeTablePrefix(uint32(desc.ID + 1)))
+ intentSpan := roachpb.Span{Key: tableStartKey, EndKey: tableEndKey}
if ti.autoCommit {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
err = ti.txn.CommitInBatch(ctx, ti.b)
} else {
err = ti.txn.Run(ctx, ti.b)
+ if ti.insertType == NoIntentTracking {
+ // In the NoIntentTracking case, the batch we just ran didn't record any
+ // intents in the TxnCoordSender. We'll send a RecordIntentsRequest to
+ // compensate for that.
+ recordIntentsBatch := ti.txn.NewBatch()
+ et := &roachpb.RecordIntentsRequest{
+ IntentSpans: []roachpb.Span{intentSpan},
+ }
+ recordIntentsBatch.AddRawRequest(et)
+ errRecIntents := ti.txn.Run(ctx, recordIntentsBatch)
+ if errRecIntents != nil {
+ log.Errorf(ctx, "error recording intents: %s", errRecIntents)
+ if err == nil {
+ err = errRecIntents
+ }
+ }
+ }
b.AddRawRequest(endTxnReq(true /* commit */, txn.deadline, txn.systemConfigTrigger))
return txn.Run(ctx, b)
+// CommitInBatchWithIntentSpan is like CommitInBatch, except that an IntentSpan
+// is set in the EndTransactionRequest. This is used when at least some of the
+// batches in the transaction had TxnCoordSenderIgnoreIntents set.
+func (txn *Txn) CommitInBatchWithIntentSpan(
+ ctx context.Context, b *Batch, intentSpan roachpb.Span,
+) error {
+ if txn != b.txn {
+ return errors.Errorf("a batch b can only be committed by b.txn")
+ }
+ et := endTxnReq(true /* commit */, txn.deadline, txn.systemConfigTrigger)
+ et.(*roachpb.EndTransactionRequest).IntentSpans = []roachpb.Span{intentSpan}
+ b.AddRawRequest(et)
+ return txn.Run(ctx, b)
+}
+
@@ -379,6 +379,7 @@ func TestCommonMethods(t *testing.T) {
{txnType, "AcceptUnhandledRetryableErrors"}: {},
{txnType, "Commit"}: {},
{txnType, "CommitInBatch"}: {},
+ {txnType, "CommitInBatchWithIntentSpan"}: {},
{txnType, "CommitOrCleanup"}: {},
{txnType, "Rollback"}: {},
{txnType, "CleanupOnError"}: {},
@@ -107,13 +113,38 @@ func (ti *tableInserter) row(ctx context.Context, values parser.Datums) (parser.
func (ti *tableInserter) finalize(ctx context.Context) error {
var err error
+ desc := ti.ri.Helper.TableDesc
+ tableStartKey := roachpb.Key(keys.MakeTablePrefix(uint32(desc.ID)))
+ tableEndKey := roachpb.Key(keys.MakeTablePrefix(uint32(desc.ID + 1)))
+ intentSpan := roachpb.Span{Key: tableStartKey, EndKey: tableEndKey}
if ti.autoCommit {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
- err = ti.txn.CommitInBatch(ctx, ti.b)
+ if ti.insertType == NoIntentTracking {
+ err = ti.txn.CommitInBatchWithIntentSpan(ctx, ti.b, intentSpan)
+ } else {
+ err = ti.txn.CommitInBatch(ctx, ti.b)
+ }
} else {
@@ -392,23 +409,25 @@ func (tc *TxnCoordSender) Send(
txnMeta := tc.txnMu.txns[txnID]
distinctSpans := true
if txnMeta != nil {
- et.IntentSpans = txnMeta.keys
+ et.IntentSpans = append(et.IntentSpans, txnMeta.keys...)
// Defensively set distinctSpans to false if we had any previous
// requests in this transaction. This effectively limits the distinct
// spans optimization to 1pc transactions.
distinctSpans = len(txnMeta.keys) == 0
}
@@ -366,12 +389,6 @@ func (tc *TxnCoordSender) Send(
return nil, roachpb.NewErrorf("EndTransaction must not have a Key set")
}
et.Key = ba.Txn.Key
- if len(et.IntentSpans) > 0 {
- // TODO(tschottdorf): it may be useful to allow this later.
- // That would be part of a possible plan to allow txns which
- // write on multiple coordinators.
- return nil, roachpb.NewErrorf("client must not pass intents to EndTransaction")
- }
}
}
root@:26257/dd> CREATE TABLE codelab.series AS SELECT generate_series FROM generate_series(1, 100100);
SELECT 100100
root@:26257/dd> select count(1) from codelab.series;
+----------+
| count(1) |
+----------+
| 100100 |
+----------+
(1 row)
root@:26257/dd> begin;
Now adding input for a multi-line SQL transaction client-side.
Press Enter two times to send the SQL text collected so far to the server, or Ctrl+C to cancel.
->
BEGIN
root@:26257/dd OPEN> CREATE TABLE codelab.series2 AS SELECT generate_series FROM generate_series(1, 100100);
SELECT 100100
root@:26257/dd OPEN> commit;
COMMIT
root@:26257/dd> SELECT count(1) FROM codelab.series2;
+----------+
| count(1) |
+----------+
| 100100 |
+----------+
(1 row)
- mark the txn record as committed
- asynchronously cleanup all the intents
- once all intents have been successfully cleaned up, delete the transaction record.
Copyright (C) Cockroach Labs.
Attention: This documentation is provided on an "as is" basis, without warranties or conditions of any kind, either express or implied, including, without limitation, any warranties or conditions of title, non-infringement, merchantability, or fitness for a particular purpose.