Skip to content

Commit a37549f

Browse files
committed
support BinlogPosition
1 parent 7841948 commit a37549f

File tree

7 files changed

+675
-19
lines changed

7 files changed

+675
-19
lines changed

README.md

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
[![MyGet Version](https://img.shields.io/myget/scisharp/vpre/SciSharp.MySQL.Replication)](https://www.myget.org/feed/scisharp/package/nuget/SciSharp.MySQL.Replication)
55
[![NuGet Version](https://img.shields.io/nuget/v/SciSharp.MySQL.Replication.svg?style=flat)](https://www.nuget.org/packages/SciSharp.MySQL.Replication/)
66

7-
dotnet-mysql-replication is a C# Implementation of MySQL replication protocol client. This allows you to receive events like insert, update, delete with their data and raw SQL queries from MySQL.
7+
A C# Implementation of MySQL replication protocol client
8+
9+
This library allows you to receive events like insert, update, delete with their data and raw SQL queries from MySQL.
810

911
## Features
1012

@@ -21,6 +23,8 @@ dotnet-mysql-replication is a C# Implementation of MySQL replication protocol cl
2123
- Checksum verification support
2224
- Built-in support for MySQL binary format parsing
2325
- Async/await first design
26+
- Track and save binary log position
27+
- Start replication from a specific binary log position
2428

2529
## Requirements
2630

@@ -67,6 +71,79 @@ client.StartReceive();
6771
await client.CloseAsync();
6872
```
6973

74+
## Using Async Stream API
75+
76+
You can use the modern C# async stream pattern to process MySQL events using `GetEventLogStream()`:
77+
78+
```csharp
79+
using SciSharp.MySQL.Replication;
80+
using SciSharp.MySQL.Replication.Events;
81+
82+
var client = new ReplicationClient();
83+
var result = await client.ConnectAsync("localhost", "root", "password", 1);
84+
85+
if (!result.Result)
86+
{
87+
Console.WriteLine($"Failed to connect: {result.Message}.");
88+
return;
89+
}
90+
91+
// Process events as they arrive using await foreach
92+
await foreach (var logEvent in client.GetEventLogStream())
93+
{
94+
switch (logEvent)
95+
{
96+
case WriteRowsEvent writeEvent:
97+
Console.WriteLine($"INSERT on table: {writeEvent.TableId}");
98+
break;
99+
100+
case UpdateRowsEvent updateEvent:
101+
Console.WriteLine($"UPDATE on table: {updateEvent.TableId}");
102+
break;
103+
104+
case QueryEvent queryEvent:
105+
Console.WriteLine($"SQL Query: {queryEvent.Query}");
106+
break;
107+
108+
// Handle other event types as needed
109+
}
110+
}
111+
112+
await client.CloseAsync();
113+
```
114+
115+
This approach is useful for:
116+
- Modern C# applications using .NET Core 3.0+
117+
- Processing events sequentially in a more fluent, readable way
118+
- Easier integration with async/await patterns
119+
- Avoiding event handler callback complexity
120+
121+
## Position Tracking and Custom Starting Position
122+
123+
You can track the current binary log position and start from a specific position:
124+
125+
```csharp
126+
using SciSharp.MySQL.Replication;
127+
128+
var client = new ReplicationClient();
129+
130+
// Track position changes
131+
client.PositionChanged += (sender, position) =>
132+
{
133+
Console.WriteLine($"Current position: {position}");
134+
// Save position to a file, database, etc.
135+
File.WriteAllText("binlog-position.txt", $"{position.Filename}:{position.Position}");
136+
};
137+
138+
// Start from a specific position
139+
var startPosition = new BinlogPosition("mysql-bin.000001", 4);
140+
var result = await client.ConnectAsync("localhost", "root", "password", 1, startPosition);
141+
142+
// Get current position at any time
143+
var currentPosition = client.CurrentPosition;
144+
Console.WriteLine($"Current log file: {currentPosition.Filename}, position: {currentPosition.Position}");
145+
```
146+
70147
## Advanced Usage
71148

72149
### Working with Specific Events
@@ -104,7 +181,6 @@ client.PackageHandler += (s, e) =>
104181
{
105182
Console.WriteLine($" Column: {cell.ColumnIndex}, Value: {cell.Value}");
106183
}
107-
108184
Console.WriteLine(" After update:");
109185
foreach (var cell in row.AfterUpdate)
110186
{
@@ -127,7 +203,12 @@ client.PackageHandler += (s, e) =>
127203

128204
case QueryEvent queryEvent:
129205
Console.WriteLine($"SQL Query: {queryEvent.Query}");
130-
Console.WriteLine($"Database: {queryEvent.DatabaseName}");
206+
Console.WriteLine($"Database: {queryEvent.Schema}");
207+
break;
208+
209+
case RotateEvent rotateEvent:
210+
Console.WriteLine($"Rotating to new binary log: {rotateEvent.NextBinlogFileName}");
211+
Console.WriteLine($"New position: {rotateEvent.RotatePosition}");
131212
break;
132213
}
133214
};
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System;
2+
3+
namespace SciSharp.MySQL.Replication
4+
{
5+
/// <summary>
6+
/// Represents a position in a MySQL binary log file.
7+
/// </summary>
8+
public class BinlogPosition
9+
{
10+
/// <summary>
11+
/// Gets or sets the filename of the binary log.
12+
/// </summary>
13+
public string Filename { get; set; }
14+
15+
/// <summary>
16+
/// Gets or sets the position within the binary log file.
17+
/// </summary>
18+
public int Position { get; set; }
19+
20+
/// <summary>
21+
/// Initializes a new instance of the <see cref="BinlogPosition"/> class.
22+
/// </summary>
23+
public BinlogPosition()
24+
{
25+
}
26+
27+
/// <summary>
28+
/// Initializes a new instance of the <see cref="BinlogPosition"/> class by copying another instance.
29+
/// </summary>
30+
/// <param name="binlogPosition">The other binlogPosition.</param>
31+
public BinlogPosition(BinlogPosition binlogPosition)
32+
{
33+
Filename = binlogPosition.Filename;
34+
Position = binlogPosition.Position;
35+
}
36+
37+
/// <summary>
38+
/// Initializes a new instance of the <see cref="BinlogPosition"/> class.
39+
/// </summary>
40+
/// <param name="filename">The binary log filename.</param>
41+
/// <param name="position">The position within the binary log file.</param>
42+
public BinlogPosition(string filename, int position)
43+
{
44+
Filename = filename ?? throw new ArgumentNullException(nameof(filename));
45+
Position = position;
46+
}
47+
48+
/// <summary>
49+
/// Returns a string representation of the binlog position.
50+
/// </summary>
51+
/// <returns>A string containing the filename and position.</returns>
52+
public override string ToString()
53+
{
54+
return $"{Filename}:{Position}";
55+
}
56+
}
57+
}

src/SciSharp.MySQL.Replication/IReplicationClient.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Threading.Tasks;
33
using SuperSocket.Client;
44
using SciSharp.MySQL.Replication.Events;
5+
using System.Collections.Generic;
56

67
namespace SciSharp.MySQL.Replication
78
{
@@ -20,12 +21,39 @@ public interface IReplicationClient
2021
/// <returns>A task that represents the asynchronous login operation and contains the login result.</returns>
2122
Task<LoginResult> ConnectAsync(string server, string username, string password, int serverId);
2223

24+
/// <summary>
25+
/// Connects to a MySQL server with the specified credentials and starts replication from a specific binlog position.
26+
/// </summary>
27+
/// <param name="server">The server address to connect to.</param>
28+
/// <param name="username">The username for authentication.</param>
29+
/// <param name="password">The password for authentication.</param>
30+
/// <param name="serverId">The server ID to use for the replication client.</param>
31+
/// <param name="binlogPosition">The binary log position to start replicating from.</param>
32+
/// <returns>A task that represents the asynchronous login operation and contains the login result.</returns>
33+
Task<LoginResult> ConnectAsync(string server, string username, string password, int serverId, BinlogPosition binlogPosition);
34+
35+
/// <summary>
36+
/// Gets the current binary log position.
37+
/// </summary>
38+
BinlogPosition CurrentPosition { get; }
39+
40+
/// <summary>
41+
/// Event triggered when the binary log position changes.
42+
/// </summary>
43+
event EventHandler<BinlogPosition> PositionChanged;
44+
2345
/// <summary>
2446
/// Receives the next log event from the server.
2547
/// </summary>
2648
/// <returns>A task representing the asynchronous receive operation and containing the log event.</returns>
2749
ValueTask<LogEvent> ReceiveAsync();
2850

51+
/// <summary>
52+
/// Asynchronously streams log events from the server.
53+
/// This method will yield log events as they are received.
54+
/// </summary>
55+
IAsyncEnumerable<LogEvent> GetEventLogStream();
56+
2957
/// <summary>
3058
/// Closes the connection to the server.
3159
/// </summary>

0 commit comments

Comments
 (0)