码迷,mamicode.com
首页 > 编程语言 > 详细

线程安全队列

时间:2018-03-18 13:09:22      阅读:218      评论:0      收藏:0      [点我收藏+]

标签:name   status   lin   dirty   rem   safe   64位系统   while   virt   

线程安全队列

// 队列的实现 2011-06-26
unit sfContnrs;

interface

{.$DEFINE MULTI_THREAD_QUEUE} // 线程安全版本

{.$IFDEF MULTI_THREAD_QUEUE}
uses
Windows,classes,SysUtils;//,sfString;
{.$ENDIF}

type
//2014-03-01 为了64位系统做了兼容
TsfQueue = class
private
FCapacity: Integer;
FTmpBuff: Pointer;
FBuff: Pointer;
FMaxLength:DWORD;
FThreadSafe:Boolean;
{$IFDEF MULTI_THREAD_QUEUE}
FCS: TRTLCriticalSection;
{$ENDIF}
// \\
FPushIndex: Integer;
FPopIndex: Integer;

procedure Inernal_SetCapacity(const Value: Integer);
// \\
procedure setCapacity(const Value: Integer);
function getCapacity: Integer;
function getEmpty: Boolean;
function getLength: Integer;
public //2015-07-12 protected==>public
procedure Lock();
procedure UnLock();
public
//pvMaxLength :队列允许的最大长度,0 不限制
constructor Create(InitCapacity: Integer = 1024;pvMaxLength:DWORD=0);
constructor CreateNoThreadSafe(InitCapacity:Integer=1024;pvMaxLength:DWORD=0);
destructor Destroy(); override;
// \\
function Push(AItem: Pointer): Pointer; virtual;
function Pop(): Pointer; virtual;
procedure Clear();virtual;
public
property ThreadSefe:Boolean read FThreadSafe;// write FThreadSafe;
property Capacity: Integer read getCapacity write setCapacity;
property Empty: Boolean read getEmpty;
property Length: Integer read getLength;
property MaxLength:DWORD read FMaxLength;
end;

//为了兼容64编译(2014-03-01 17:35)
TIntegerQueue = class
private
FCapacity: Integer;
FTmpBuff: Pointer;
FBuff: Pointer;
FMaxLength:DWORD;
{$IFDEF MULTI_THREAD_QUEUE}
FCS: TRTLCriticalSection;
{$ENDIF}
FPushIndex: Integer;
FPopIndex: Integer;

procedure Inernal_SetCapacity(const Value: Integer);
// \\
procedure setCapacity(const Value: Integer);
function getCapacity: Integer;
function getEmpty: Boolean;
function getLength: Integer;
protected
procedure Lock();
procedure UnLock();
public
//pvMaxLength :队列允许的最大长度,0 不限制
constructor Create(InitCapacity: Integer = 1024;pvMaxLength:DWORD=0);
destructor Destroy(); override;
// \\
function Push(AItem: Integer): Integer;
function Pop(): Integer; virtual;
procedure Clear();virtual;
public
property Capacity: Integer read getCapacity write setCapacity;
property Empty: Boolean read getEmpty;
property Length: Integer read getLength;
property MaxLength:DWORD read FMaxLength;
end;

//哈希表实现 2015-02-05 10:43
TArrayOfPointer = array of Pointer;
PPsfHashItem = ^PsfHashItem;
PsfHashItem = ^TsfHashItem;

TsfHashItem = record
Next: PsfHashItem;
Key: Int64;
Value: Pointer; // TCacheMemoryBlock;
Used: Integer;
Index: Integer;
Hash: Integer;
R: Integer; // 对齐用
end;

TEventOnDisposeBucket = procedure(Sender: TObject; Value: PsfHashItem) of object;

{
//First in First out(FIFO)
我是先进先出,我是一个低负载的算法,并且对缓存对象的管理要求不高。
我通过一个队列去跟踪所有的缓存对象,最近最常用的缓存对象放在后面,
而更早的缓存对象放在前面,当缓存容量满时,排在前面的缓存对象会被踢走,
然后把新的缓存对象加进去。我很快,但是我并不适用。
}
THashFIFO = class //First in First out(FIFO)
private
FBucketStacks: array of PsfHashItem;
FStackIndex: Integer;
FStatckTop: Integer; // 栈顶位置
FStackBottom: Integer; // 栈底位置
FCurPopIndex: Integer;
FOnDisposeBucket: TEventOnDisposeBucket;
function Push(Value: PsfHashItem): Integer;
function Pop(): PsfHashItem;
function GetFreeCount: Integer; // 空间不足时,删除最早的数据
function Bind(const Key: Int64): Pointer;
protected
Buckets: array of PsfHashItem;
BucketPool: array of PsfHashItem;
function Find(const Key: Int64): PPsfHashItem;
function HashOf(const Key: Int64): Cardinal; virtual;
procedure DisposeBucket(Value: PsfHashItem); virtual;
procedure Clear;
procedure Remove(const Key: Int64); overload;
procedure Remove(const Hash: Integer; const Key: Int64); overload;
function Modify(const Key: Int64; Value: Pointer): Boolean;
public
constructor Create(Size: Cardinal; pvInitValue: TArrayOfPointer = nil);
destructor Destroy; override;
function NewBucket(): PsfHashItem;
function Add(const Key: Int64; Value: Pointer): Integer;
function ValueOf(const Key: Int64): Pointer;
public
property FreeCount: Integer read GetFreeCount; // 空闲的空间个数
property OnDisposeBucket: TEventOnDisposeBucket read FOnDisposeBucket
write FOnDisposeBucket;
end;

 

//2015-02-06 13:51
//环形数组,使用时需要设置最大数组元素个数
//在使用中,如果超出了最大容量,那么最早压入
//的数据被移除,然后放入新压入的数据
//OnElementPush 在数据压入前,触发,可以有机会去
//处理要删除掉的元素数据
TElementFreeEvent = procedure(Sender:TObject;const OldValue:NativeInt;
var NewValue:NativeInt) of object;

TCircleArrayOfNativeInt=class
private
FOverWrap:Boolean;
FCS:TRTLCriticalSection;
FDataArray:array of NativeInt;
FWritePosition:Integer;
FReadPosition:Integer;
FDataCount:Integer;
FDataSize:Integer;
FOnElementFree:TElementFreeEvent;
FThreadSafe:Boolean;
FDirty:Boolean;
function getDataCount: Integer;
protected
procedure OnElementPush(const OldValue:NativeInt;var NewValue:NativeInt);virtual;
public
FID:array[1..32] of Ansichar;//数据所有者标示符;
constructor Create(Size:Integer;AThreadSafe:Boolean);
destructor Destroy();override;
procedure Lock();
procedure UnLock();
function Push(Value:NativeInt):Integer;
function Pop(var Value:NativeInt):Boolean;
procedure SaveToFile(const AFileName:string);
function SaveToStream(AStream:TStream):Integer;
class function LoadfromFile(const AFileName:string):TCircleArrayOfNativeInt;
class function LoadFromStream(AStream:TStream):TCircleArrayOfNativeInt;
public
property Dirty:Boolean read FDirty;
property ThreadSafe:Boolean read FThreadSafe;
property DataCount:Integer read getDataCount;
property OverWrap:Boolean read FOverWrap;
property OnElementFree:TElementFreeEvent read FOnElementFree write FOnElementFree;
end;

TCircleArrayHeader=record
FileSize : Integer;
DataSize : Integer;//数组容量
DataCount : Integer;
WritePosition : Integer;
ReadPosition : Integer;
WriteDate:TDateTime;
OverWrap:Boolean;
ThreadSafe:Boolean;
ID:array[1..32] of Ansichar;//数据所有者标示符
R:array[1..2] of Ansichar;//对齐用
end;

PFixQueueNode=^TFixQueueNode;
TFixQueueNode=record
Data:NativeInt;
Status:Integer; //0:空闲; 1:有数据
Next:PFixQueueNode;
end;

//定长的队列(线程安全) 2015-07-24 09:30
TFixThreadQueue=class
private
FCS:TRTLCriticalSection;
FSize:Integer;//队列容量
FDataCount:Integer;//队列中存在的数据个数
FPushNode:PFixQueueNode;
FPopNode:PFixQueueNode;
FMemPtr:Pointer;
function GetDataCount: Integer;
procedure InitQueue(QueueSize:Integer);//初始化队列
public
constructor Create(ASize:Integer=256);//256 默认可以存储256个元素
destructor Destroy();override;
function Push(Data:NativeInt;NeedLock:Boolean=TRUE):Boolean;
function Pop(var Data:NativeInt;NeedLock:Boolean=TRUE):Boolean;
function PickUp(Data:NativeInt;NeedLock:Boolean=TRUE):Boolean;//与Pop 操作类似,但不移动指针 2017/05/01
procedure Lock();
procedure UnLock();
public
property Size:Integer read FSize;//队列容量
property MemPtr:Pointer read FMemPtr;
property DataCount:Integer read GetDataCount; //队列中存在的数据个数
end;


//定时器队列;线程安全;(2016/08/26 16:23)
//2017/06/19 14:38 针对64位做了调整
PWTimeNode=^TWTimeNode;
TWTimeNode=record
Slot:Integer; //内部
Index:Integer; //内部
TimeOutSec:DWORD;//定时秒数
Tag:Integer; //用户使用
UserData:Pointer;//用户使用
Next:PWTimeNode; //内部
end;

TsfTimerWheel=class(TThread) //2016/08/26 15:36 定时精度 1秒
private
FClockPosition:Integer;
FMaxSecCount:DWORD;
FCS:TRTLCriticalSection;//TCriticalSection;
FRunging:Boolean;
protected
FTimeNodes:array of TWTimeNode; //array[0..AMaxNodeCount - 1] of TWTimeNode
procedure Execute();override;

//当前定时到期的节点链表(AFirstNode 链表头)
//回调完成后,内部会等待 AWorkDone = true 才继续运行;
//此参数的目的是在 OnTimer中,可以执行异步操作完成后,设置AWorkDone=true //2017/06/19 添加
procedure OnTimer(AFirstNode:PWTimeNode;var AWorkDone:Boolean);virtual;abstract; //使用方法演示,见当前文件尾部
public
//AMaxNodeCount 最大处理的定时数据个数;
//AMaxSecCount 最大定时秒数;定时精度 1秒
constructor Create(AMaxSecCount:DWORD;AMaxNodeCount:DWORD);
destructor Destroy();override;
//\\
//>=0 成功返回 内部索引(删除定时器用)
//-1 队列满
//-2 超过;定时上限(秒)----MaxSecs
function AddTimer(UserData:Pointer;Tag:Integer;ATimeOutSec:DWORD):Integer;
function DeleteTimer(AIndex:Integer):Boolean;
//\\
procedure Lock();
procedure UnLock();
public
property Runging:Boolean read FRunging;
property MaxSecs:DWORD read FMaxSecCount;//定时上限(秒)
end;

 

implementation

type
TInnerMemoryStream=class(TMemoryStream)
public
procedure SaveToStream(Stream: TStream); override;
end;

{ TsfQueue }

procedure TsfQueue.Clear;
begin
Lock();
FPopIndex := 0;
FPushIndex := 0;
UnLock();
end;

constructor TsfQueue.Create(InitCapacity: Integer;pvMaxLength:DWORD);
begin
FThreadSafe := TRUE;
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
InitializeCriticalSection(FCS);
{$ENDIF}
if InitCapacity < 1024 then
InitCapacity := 1024;

Inernal_SetCapacity(InitCapacity);

FMaxLength := pvMaxLength;

end;

constructor TsfQueue.CreateNoThreadSafe(InitCapacity: Integer;
pvMaxLength: DWORD);
begin
FThreadSafe := FALSE;
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
InitializeCriticalSection(FCS);
{$ENDIF}
if InitCapacity < 1024 then
InitCapacity := 1024;

Inernal_SetCapacity(InitCapacity);

FMaxLength := pvMaxLength;
end;

destructor TsfQueue.Destroy;
begin
FreeMem(FBuff);
if FTmpBuff <> nil then
FreeMem(FTmpBuff);
// \\
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
begin
DeleteCriticalSection(FCS);
end;
{$ENDIF}
inherited;
end;

procedure TsfQueue.Lock;
begin
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
begin
EnterCriticalSection(FCS);
end;
{$ENDIF}
end;

procedure TsfQueue.UnLock;
begin
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
begin
LeaveCriticalSection(FCS);
end;
{$ENDIF}
end;

procedure TsfQueue.Inernal_SetCapacity(const Value: Integer);
var
PageCount, ASize: Integer;
begin
if Value > FCapacity then
begin
if FTmpBuff <> nil then
FreeMem(FTmpBuff);

// 扩容
ASize := Value * SizeOf(Pointer); // 计算出所需要的字节数量
PageCount := ASize div 4096;
if (ASize mod 4096) > 0 then
Inc(PageCount);

// 转移数据
GetMem(FTmpBuff, PageCount * 4096);
FillChar(FTmpBuff^, PageCount * 4096, #0);

if FBuff <> nil then
begin
Move(FBuff^, FTmpBuff^, FCapacity * SizeOf(Pointer));
FreeMem(FBuff);
end;

FBuff := FTmpBuff;

// 计算新的容量
FCapacity := (PageCount * 4096) div SizeOf(Pointer);

if FCapacity >= 2048 then
begin
// FTmpBuff 分配用于Pop时候,移动内存用
GetMem(FTmpBuff, PageCount * 4096);
end
else
FTmpBuff := nil;
end;
end;

function TsfQueue.Pop: Pointer;
procedure AdjuestMem();
var
pSrc: PInteger;
pTmp: Pointer;
begin
FillChar(FTmpBuff^, FCapacity * SizeOf(Pointer), #0);
pSrc := PInteger(FBuff);
Inc(pSrc, FPopIndex);
Move(pSrc^, FTmpBuff^, (FCapacity - FPopIndex) * SizeOf(Pointer));
// \\
// 交换指针
pTmp := FBuff;
FBuff := FTmpBuff;
FTmpBuff := pTmp;
// \\
end;

const
_MoveRange_ = 2048;

var
P: PInteger;
begin
Lock();
try
Result := nil;
if (FPopIndex = FPushIndex) then
Exit;
P := PInteger(FBuff);
Inc(P, FPopIndex);
Result := Pointer(P^);
Inc(FPopIndex);
// 队列底部空余内存达到 8192 整体搬迁
if FPopIndex = _MoveRange_ then
begin
AdjuestMem();
FPopIndex := 0;
Dec(FPushIndex, _MoveRange_);
end;
finally
UnLock();
end;
end;

function TsfQueue.Push(AItem: Pointer): Pointer;
var
P: PInteger;
Len:Integer;
begin
Lock();
try
Result := nil;
Len := Self.FPushIndex - Self.FPopIndex;//队列当前长度
if (FMaxLength > 0) and (Len >= FMaxLength) then //超出队列长度
begin
Exit;
end;
P := PInteger(FBuff);
Inc(P, FPushIndex);
P^ := Integer(AItem);
Inc(FPushIndex);
if FPushIndex >= FCapacity then
begin
// 扩容加 1024 个位置
Inernal_SetCapacity(FCapacity + 1024);
end;
Result := AItem;
finally
UnLock();
end;
end;

procedure TsfQueue.setCapacity(const Value: Integer);
begin
Lock();
try
Inernal_SetCapacity(Value);
finally
UnLock();
end;
end;

function TsfQueue.getCapacity: Integer;
begin
Lock();
try
Result := Self.FCapacity;
finally
UnLock();
end;
end;

function TsfQueue.getEmpty: Boolean;
begin
Lock();
Result := (FPushIndex = FPopIndex);
UnLock();
end;

function TsfQueue.getLength: Integer;
begin
Lock();
Result := Self.FPushIndex - Self.FPopIndex;
UnLock();
end;

{ TIntegerQueue }

procedure TIntegerQueue.Clear;
begin
Lock();
FPopIndex := 0;
FPushIndex := 0;
UnLock();
end;

constructor TIntegerQueue.Create(InitCapacity: Integer;pvMaxLength:DWORD);
begin
{$IFDEF MULTI_THREAD_QUEUE}
InitializeCriticalSection(FCS);
{$ENDIF}
if InitCapacity < 1024 then
InitCapacity := 1024;

Inernal_SetCapacity(InitCapacity);

FMaxLength := pvMaxLength;

end;

destructor TIntegerQueue.Destroy;
begin
FreeMem(FBuff);
if FTmpBuff <> nil then
FreeMem(FTmpBuff);
// \\
{$IFDEF MULTI_THREAD_QUEUE}
DeleteCriticalSection(FCS);
{$ENDIF}
inherited;
end;

procedure TIntegerQueue.Lock;
begin
{$IFDEF MULTI_THREAD_QUEUE}
EnterCriticalSection(FCS);
{$ENDIF}
end;

procedure TIntegerQueue.UnLock;
begin
{$IFDEF MULTI_THREAD_QUEUE}
LeaveCriticalSection(FCS);
{$ENDIF}
end;

procedure TIntegerQueue.Inernal_SetCapacity(const Value: Integer);
var
PageCount, ASize: Integer;
begin
if Value > FCapacity then
begin
if FTmpBuff <> nil then
FreeMem(FTmpBuff);

// 扩容
// ASize := Value * 4; // 计算出所需要的字节数量
ASize := Value * SizeOf(Pointer); // 计算出所需要的字节数量

PageCount := ASize div 4096;
if (ASize mod 4096) > 0 then
Inc(PageCount);

// 转移数据
GetMem(FTmpBuff, PageCount * 4096);
FillChar(FTmpBuff^, PageCount * 4096, #0);

if FBuff <> nil then
begin
Move(FBuff^, FTmpBuff^, FCapacity * SizeOf(Pointer));
FreeMem(FBuff);
end;

FBuff := FTmpBuff;

// 计算新的容量
FCapacity := (PageCount * 4096) div SizeOf(Pointer);

if FCapacity >= 2048 then
begin
// FTmpBuff 分配用于Pop时候,移动内存用
GetMem(FTmpBuff, PageCount * 4096);
end
else
FTmpBuff := nil;
end;
end;

function TIntegerQueue.Pop: Integer;
procedure AdjuestMem();
var
pSrc: PInteger;
pTmp: Pointer;
begin
FillChar(FTmpBuff^, FCapacity * SizeOf(Pointer), #0);
pSrc := PInteger(FBuff);
Inc(pSrc, FPopIndex);
Move(pSrc^, FTmpBuff^, (FCapacity - FPopIndex) * SizeOf(Pointer));
// \\
// 交换指针
pTmp := FBuff;
FBuff := FTmpBuff;
FTmpBuff := pTmp;
// \\
end;

const
_MoveRange_ = 2048;

var
P: PInteger;
begin
Lock();
try
Result := 0;
if (FPopIndex = FPushIndex) then
Exit;
P := PInteger(FBuff);
Inc(P, FPopIndex);
Result := P^;
Inc(FPopIndex);
// 队列底部空余内存达到 8192 整体搬迁
if FPopIndex = _MoveRange_ then
begin
AdjuestMem();
FPopIndex := 0;
Dec(FPushIndex, _MoveRange_);
end;
finally
UnLock();
end;
end;

function TIntegerQueue.Push(AItem: Integer): Integer;
var
P: PInteger;
Len:Integer;
begin
Lock();
try
Result := 0;
Len := Self.FPushIndex - Self.FPopIndex;//队列当前长度
if (FMaxLength > 0) and (Len >= FMaxLength) then //超出队列长度
begin
Exit;
end;
P := PInteger(FBuff);
Inc(P, FPushIndex);
P^ := Integer(AItem);
Inc(FPushIndex);
if FPushIndex >= FCapacity then
begin
// 扩容加 1024 个位置
Inernal_SetCapacity(FCapacity + 1024);
end;
Result := AItem;
finally
UnLock();
end;
end;

procedure TIntegerQueue.setCapacity(const Value: Integer);
begin
Lock();
try
Inernal_SetCapacity(Value);
finally
UnLock();
end;
end;

function TIntegerQueue.getCapacity: Integer;
begin
Lock();
try
Result := Self.FCapacity;
finally
UnLock();
end;
end;

function TIntegerQueue.getEmpty: Boolean;
begin
Lock();
Result := (FPushIndex = FPopIndex);
UnLock();
end;

function TIntegerQueue.getLength: Integer;
begin
Lock();
Result := Self.FPushIndex - Self.FPopIndex;
UnLock();
end;

 

{ THashFIFO }

function THashFIFO.Add(const Key: Int64; Value: Pointer): Integer;
var
Hash: Integer;
Bucket: PsfHashItem;
begin
Bucket := NewBucket();
if Bucket <> nil then
begin
Hash := HashOf(Key) mod Cardinal(Length(Buckets));
Bucket^.Key := Key;
Bucket^.Value := Value;
Bucket^.Next := Buckets[Hash];
Buckets[Hash] := Bucket;
Result := Hash;
Bucket^.Hash := Hash;
Result := Bucket^.Index;
end
else
Result := -1; // 空间满
end;

function THashFIFO.Bind(const Key: Int64): Pointer;
var
Hash: Integer;
Bucket: PsfHashItem;
begin
Bucket := NewBucket();
if Bucket <> nil then
begin
Hash := HashOf(Key) mod Cardinal(Length(Buckets));
Bucket^.Key := Key;
Bucket^.Next := Buckets[Hash];
Bucket^.Hash := Hash;
Buckets[Hash] := Bucket;
Result := Bucket^.Value;
end
else
Result := nil;
end;

procedure THashFIFO.Clear;
var
I: Integer;
P, N: PsfHashItem;
begin
for I := 0 to Length(Buckets) - 1 do
begin
P := Buckets[I];
while P <> nil do
begin
N := P^.Next;
// Dispose(P);
DisposeBucket(P);
P := N;
end;
Buckets[I] := nil;
end;
end;

constructor THashFIFO.Create(Size: Cardinal; pvInitValue: TArrayOfPointer);
var
Index: Integer;
PH: PsfHashItem;
begin
inherited Create;
SetLength(Buckets, Size);
for Index := Low(Buckets) to High(Buckets) do
Buckets[Index] := nil;

SetLength(FBucketStacks, Size);
for Index := Low(FBucketStacks) to High(FBucketStacks) do
FBucketStacks[Index] := nil;

FStackBottom := Low(Buckets);
FStatckTop := High(Buckets);
FStackIndex := FStackBottom;
Dec(FStackIndex);
// \\
SetLength(BucketPool, Size); // :array of PSocketHashItem;
for Index := Low(BucketPool) to High(BucketPool) do
begin
New(PH);
PH^.Next := nil;
PH^.Key := 0;
if (pvInitValue <> nil) then
PH^.Value := pvInitValue[Index]
else
PH^.Value := nil;
PH^.Index := Index;
BucketPool[Index] := PH;
Push(PH);
end;
// \\
FCurPopIndex := -1;
end;

destructor THashFIFO.Destroy;
var
Index: Integer;
P: PsfHashItem;
begin
Clear;
for Index := Low(BucketPool) to High(BucketPool) do
begin
P := BucketPool[Index];
if P <> nil then
Dispose(P);
end;
// FCS.Free();
inherited Destroy;
end;

procedure THashFIFO.DisposeBucket(Value: PsfHashItem);
begin
if Assigned(OnDisposeBucket) then
OnDisposeBucket(Self, Value);
Value^.Used := $00; // 空闲标记
Push(Value);
end;

function THashFIFO.Find(const Key: Int64): PPsfHashItem;
var
Hash: Integer;
begin
Hash := HashOf(Key) mod Cardinal(Length(Buckets));
Result := @Buckets[Hash];
while Result^ <> nil do
begin
if Result^.Key = Key then
Exit
else
Result := @Result^.Next;
end;
end;

function THashFIFO.GetFreeCount: Integer;
begin
if FStackIndex >= FStackBottom then
Result := (FStackIndex - FStackBottom) + 1
else
Result := 0;
end;

function THashFIFO.HashOf(const Key: Int64): Cardinal;
var
I: Integer;
P: PByte;
begin
Result := 0;
P := @Key;
for I := 1 to SizeOf(Key) do
begin
Result := ((Result shl 2) or (Result shr (SizeOf(Result) * 8 - 2))) xor P^;
Inc(P);
end;
end;

function THashFIFO.Modify(const Key: Int64; Value: Pointer): Boolean;
var
P: PsfHashItem;
begin
P := Find(Key)^;
if P <> nil then
begin
Result := TRUE;
P^.Value := Value;
end
else
Result := FALSE;
end;

function THashFIFO.NewBucket: PsfHashItem;
begin
Result := Self.Pop();
if Result <> nil then
Result^.Used := $01;
end;

function THashFIFO.Pop: PsfHashItem;
label Lable_BEGIN;
begin
Lable_BEGIN:
if FStackIndex >= FStackBottom then
begin
Result := PsfHashItem(FBucketStacks[FStackIndex]);
Dec(FStackIndex);
// \\
if FCurPopIndex < 0 then
FCurPopIndex := Result.Index;
end
else
begin
Result := BucketPool[FCurPopIndex];
if Result <> nil then
Remove(Result^.Hash, Result^.Key);
Dec(FCurPopIndex);
if FCurPopIndex < 0 then
FCurPopIndex := High(FBucketStacks);
goto Lable_BEGIN;
end;
end;

function THashFIFO.Push(Value: PsfHashItem): Integer;
begin
if FStackIndex < FStatckTop then
begin
Inc(FStackIndex);
FBucketStacks[FStackIndex] := Value;
Result := FStackIndex;
end
else
Result := -1;
end;

procedure THashFIFO.Remove(const Hash: Integer; const Key: Int64);

function InnerFind(): PPsfHashItem;
// var
// Hash: Integer;
begin
// Hash := HashOf(Key) mod Cardinal(Length(Buckets));
Result := @Buckets[Hash];
while Result^ <> nil do
begin
if Result^.Key = Key then
Exit
else
Result := @Result^.Next;
end;
end;

var
P: PsfHashItem;
Prev: PPsfHashItem;
begin
Prev := InnerFind();
P := Prev^;
if P <> nil then
begin
Prev^ := P^.Next;
DisposeBucket(P);
end;
end;

procedure THashFIFO.Remove(const Key: Int64);
var
P: PsfHashItem;
Prev: PPsfHashItem;
begin
Prev := Find(Key);
P := Prev^;
if P <> nil then
begin
Prev^ := P^.Next;
DisposeBucket(P);
end;
end;

function THashFIFO.ValueOf(const Key: Int64): Pointer;
var
P: PsfHashItem;
begin
P := Find(Key)^;
if P <> nil then
Result := P^.Value
else
Result := nil; // -1;
end;

 

{ TCircleArrayOfNativeInt }

constructor TCircleArrayOfNativeInt.Create(Size: Integer;AThreadSafe:Boolean);
begin
SetLength(FDataArray,Size);
FWritePosition := Low(FDataArray);
FReadPosition := Low(FDataArray);
FDataSize := Size;
FOverWrap := FALSE;
FThreadSafe := AThreadSafe;
if ThreadSafe then
begin
InitializeCriticalSection(FCS);
end;
FDirty := FALSE;
end;

destructor TCircleArrayOfNativeInt.Destroy;
begin
if ThreadSafe then
begin
DeleteCriticalSection(FCS);
end;
inherited;
end;

function TCircleArrayOfNativeInt.getDataCount: Integer;
begin
Lock();
Result := FDataCount;
UnLock();
end;

class function TCircleArrayOfNativeInt.LoadfromFile(const AFileName: string):TCircleArrayOfNativeInt;
var
FS:TFileStream;
begin
if FileExists(AFileName) then
begin
FS := TFileStream.Create(AFileName,fmOpenRead);
try
Result := LoadFromStream(FS);
finally
FS.Free();
end;
end
else Result := nil;
end;

class function TCircleArrayOfNativeInt.LoadFromStream(AStream: TStream):TCircleArrayOfNativeInt;
var
H:TCircleArrayHeader;
iSize,Index:Integer;
CH:Ansichar;
Value:NativeInt;
begin
AStream.Read(CH,1);
Windows.ZeroMemory(@H,SizeOf(H));
AStream.Read(H,SizeOf(H));
//\\
Result := TCircleArrayOfNativeInt.Create(H.DataSize,H.ThreadSafe);
Result.FOverWrap := H.OverWrap;
Result.FDataCount := H.DataCount;
Result.FWritePosition := H.WritePosition;
Result.FReadPosition := H.ReadPosition;
for Index := Low(Result.FDataArray) to High(Result.FDataArray) do
begin
AStream.Read(Value,SizeOf(Value));
if Value <= 0 then
Break;
Result.FDataArray[Index] := Value;
end;
end;

procedure TCircleArrayOfNativeInt.Lock;
begin
if ThreadSafe then
EnterCriticalSection(FCS);
end;

procedure TCircleArrayOfNativeInt.OnElementPush(const OldValue:NativeInt;
var NewValue:NativeInt);
begin
if Assigned(OnElementFree) then
begin
OnElementFree(Self,OldValue,NewValue);
end;
end;

function TCircleArrayOfNativeInt.Pop(var Value: NativeInt):Boolean;
begin
FDirty := TRUE;
Lock();
try
if FDataCount = 0 then
begin
Result := FALSE;
FReadPosition := 0;
FWritePosition := 0;
FOverWrap := FALSE;
Exit;
end;
if FReadPosition > High(FDataArray) then
begin
FReadPosition := Low(FDataArray);
end;
Value := FDataArray[FReadPosition];
Inc(FReadPosition);
Dec(FDataCount);
Result := TRUE;
finally
UnLock();
end;
end;

function TCircleArrayOfNativeInt.Push(Value: NativeInt): Integer;
begin
FDirty := TRUE;
Lock();
try
if FWritePosition > High(FDataArray) then
begin
FOverWrap := TRUE;
FWritePosition := Low(FDataArray);
OnElementPush(FDataArray[FWritePosition],Value);
FDataArray[FWritePosition] := Value;
if FReadPosition = FWritePosition then
begin
FReadPosition := FWritePosition + 1;
end;
FWritePosition := FWritePosition + 1;
end
else begin
OnElementPush(FDataArray[FWritePosition],Value);
FDataArray[FWritePosition] := Value;
FWritePosition := FWritePosition + 1;
if FDataCount < FDataSize then
begin
Inc(FDataCount);
end;
if FOverWrap then
begin
if FWritePosition >= FReadPosition then
begin
FReadPosition := FWritePosition;
if FReadPosition > High(FDataArray) then
FReadPosition := 0;
end;
end;
end;
finally
UnLock();
end;
end;

procedure TCircleArrayOfNativeInt.SaveToFile(const AFileName: string);
var
MS:TInnerMemoryStream;
begin
MS := TInnerMemoryStream.Create();
MS.Size := 1024 * 256;
MS.Position := 0;
Windows.ZeroMemory(MS.Memory,MS.Size);
SaveToStream(MS);
MS.SaveToFile(AFileName);
MS.Free();
end;

function TCircleArrayOfNativeInt.SaveToStream(AStream: TStream): Integer;
const
CH:Byte = $42;
var
H:TCircleArrayHeader;
iSize,Index:Integer;
P:PAnsichar;
begin
Lock();
try
AStream.Write(CH,1);
Windows.ZeroMemory(@H,SizeOf(H));
H.R[1] := ‘R‘;
H.R[2] := ‘R‘;
H.DataSize := FDataSize;
H.DataCount := DataCount;
H.WritePosition := FWritePosition;
H.ReadPosition := FReadPosition;
H.OverWrap := FOverWrap;
H.WriteDate := Now();
H.ThreadSafe := ThreadSafe;
StrLCopy(PAnsichar(@H.ID),
PAnsichar(@FID),
SizeOf(H.ID) - 1);
AStream.Write(H,SizeOf(H));
P := @FDataArray[0];
iSize := FDataSize * SizeOf(NativeInt);
H.FileSize := 1 + SizeOf(H) + iSize;
AStream.Write(H,SizeOf(H));
AStream.Write(P^,iSize);
finally
UnLock();
end;
FDirty := FALSE;
end;

procedure TCircleArrayOfNativeInt.UnLock;
begin
if ThreadSafe then
LeaveCriticalSection(FCS);
end;

(* 测试代码
procedure TForm1.Button1Click(Sender: TObject);
var
obj:TCircleArrayOfNativeInt;
Index,I:NativeInt;
begin
Memo1.Clear();

Obj := TCircleArrayOfNativeInt.Create(2,TRUE);

for Index := 1000 to 1000 + 88 do
begin
if Index = 1000 + 62 then
obj.Push(Index)
else
Obj.Push(Index);
end;

I := 0;
while(TRUE) do
begin
if not Obj.Pop(Index) then Break;
//if Index < 0 then break;
Memo1.Lines.Add(IntToStr(Index));
Inc(I);
if I > 3 then
Break;
end;
//Exit;
Memo1.Lines.Add(‘**********************‘);

Obj.Push(91011);
Obj.Push(91012);
Obj.Push(91013);
Obj.Push(91014);
//Obj.Push(91015);
//Obj.Push(91016);
//Obj.Push(91017);
while(TRUE) do
begin
if not Obj.Pop(Index) then Break;
Memo1.Lines.Add(IntToStr(Index));
end;

end;
*)


{ TInnerMemoryStream }

procedure TInnerMemoryStream.SaveToStream(Stream: TStream);
begin
if Position <> 0 then Stream.WriteBuffer(Memory^, Position);
end;

{ TFixThreadQueue }

constructor TFixThreadQueue.Create(ASize: Integer);
begin
if ASize <= 0 then ASize := 1;
InitializeCriticalSectionAndSpinCount(FCS,4000);
InitQueue(ASize);
Self.FSize := ASize;
end;

destructor TFixThreadQueue.Destroy;
begin
FreeMem(FMemPtr);
DeleteCriticalSection(FCS);
inherited;
end;

function TFixThreadQueue.GetDataCount: Integer;
begin
Windows.InterlockedExchange(Result,FDataCount);
end;

procedure TFixThreadQueue.InitQueue(QueueSize: Integer);
var
ASize:Integer;
P,PNext:PFixQueueNode;
Index:Integer;
begin
ASize := QueueSize * SizeOf(TFixQueueNode);
GetMem(FMemPtr,ASize);
ZeroMemory(FMemPtr,ASize);
//\\
P := FMemPtr;
PNext := P;
Inc(PNext);
for Index := 1 to QueueSize - 1 do
begin
P^.Next := PNext;
Inc(P);
Inc(PNext);
end;
P^.Next := FMemPtr;
//\\
FPushNode := FMemPtr;
FPopNode := FMemPtr;
end;

procedure TFixThreadQueue.Lock;
begin
EnterCriticalSection(FCS);
end;

function TFixThreadQueue.PickUp(Data: NativeInt;NeedLock:Boolean): Boolean;
begin
Result := FALSE;
if NeedLock then Lock();
if FPopNode^.Status = $01 then
begin
Data := FPopNode^.Data;
//FPopNode^.Status := $00;
//FPopNode := FPopNode^.Next;
Result := TRUE;
//InterlockedDecrement(FDataCount);
end;
if NeedLock then UnLock();
end;

function TFixThreadQueue.Pop(var Data: NativeInt;NeedLock:Boolean): Boolean;
begin
Result := FALSE;
if NeedLock then Lock();
if FPopNode^.Status = $01 then
begin
Data := FPopNode^.Data;
FPopNode^.Status := $00;
FPopNode := FPopNode^.Next;
Result := TRUE;
InterlockedDecrement(FDataCount);
end;
if NeedLock then UnLock();
end;

function TFixThreadQueue.Push(Data: NativeInt;NeedLock:Boolean): Boolean;
begin
Result := FALSE;
if NeedLock then Lock();
if FPushNode^.Status = $00 then
begin
FPushNode^.Data := Data;
FPushNode^.Status := $01;
FPushNode := FPushNode^.Next;
Result := TRUE;
InterlockedIncrement(FDataCount);
end;
if NeedLock then UnLock();
end;

procedure TFixThreadQueue.UnLock;
begin
LeaveCriticalSection(FCS);
end;

{ TsfTimerWheel }

function TsfTimerWheel.AddTimer(UserData: Pointer; Tag: Integer;
ATimeOutSec: DWORD): Integer;
var
Index:Integer;
begin
Result :=-1;
if ATimeOutSec > FMaxSecCount then
begin
Result := - 2;
Exit;
end;
//\\
Lock();
try
for Index := Low(FTimeNodes) to High(FTimeNodes) do
begin
if FTimeNodes[Index].Slot < 0 then
begin
FTimeNodes[Index].Slot := (ATimeOutSec + FClockPosition) mod FMaxSecCount;
FTimeNodes[Index].UserData := UserData;
FTimeNodes[Index].Tag := Tag;
FTimeNodes[Index].Index := Index;
FTimeNodes[Index].TimeOutSec := ATimeOutSec;
Result := Index;
Break;
end;
end;
finally
UnLock();
end;
end;

constructor TsfTimerWheel.Create(AMaxSecCount, AMaxNodeCount: DWORD);
var
Index:Integer;
begin
inherited Create(TRUE);
FRunging := FALSE;
FMaxSecCount := AMaxSecCount;
SetLength(FTimeNodes,AMaxNodeCount);
for Index := Low(FTimeNodes) to High(FTimeNodes) do
begin
FTimeNodes[Index].Slot := -1;
FTimeNodes[Index].Next := nil;
FTimeNodes[Index].UserData := nil;
FTimeNodes[Index].Tag := 0;
end;
InitializeCriticalSection(FCS);
FClockPosition := 0;
Resume();
end;

function TsfTimerWheel.DeleteTimer(AIndex: Integer): Boolean;
begin
Lock();
try
if (AIndex >= 0) and (AIndex - 1 <= High(FTimeNodes)) then
begin
FTimeNodes[AIndex].Slot := -1;
FTimeNodes[AIndex].Next := nil;
FTimeNodes[AIndex].Index := -1;
FTimeNodes[AIndex].TimeOutSec := 0;
FTimeNodes[AIndex].UserData := nil;
FTimeNodes[AIndex].Tag := 0;
Result := TRUE;
end
else Result := FALSE;
finally
UnLock();
end;
end;

destructor TsfTimerWheel.Destroy;
begin
DeleteCriticalSection(FCS);
inherited;
end;

procedure TsfTimerWheel.Execute;
var
Index:Integer;
ANode,FirstNode,TmpNode:PWTimeNode;
AWorkDone:Boolean;
begin
FRunging := TRUE;
while(not Terminated) do
begin
//Sleep(1000);
WaitForSingleObject(Handle,1000);
if Terminated then Break;
//if iRet = WAIT_TIMEOUT then
Lock();
try
Inc(FClockPosition);
FClockPosition := FClockPosition mod FMaxSecCount;
ANode := nil;
FirstNode := nil;
for Index := Low(FTimeNodes) to High(FTimeNodes) do
begin
if FTimeNodes[Index].Slot = FClockPosition then
begin
if ANode = nil then
begin
ANode := @FTimeNodes[Index];
ANode.Next := nil;
FirstNode := ANode;
end
else begin
ANode.Next := @FTimeNodes[Index];
ANode := ANode.Next;
ANode.Next := nil;
end;
end;
end;
//\\
if FirstNode <> nil then
begin
ANode := FirstNode;
AWorkDone := TRUE;
OnTimer(FirstNode,AWorkDone);
while(not AWorkDone) do Sleep(10);//等待 2017/06/19 14:52
//\\
while(TRUE) do
begin
//2016/09/14 20:19
if ANode.TimeOutSec > 0 then //继续定时
ANode.Slot := (ANode.TimeOutSec + FClockPosition) mod FMaxSecCount
else
ANode.Slot := -1;//删除本次定时,不会再触发
TmpNode := ANode.Next;
ANode.Next := nil;
ANode := TmpNode;// ANode.Next;
if ANode = nil then Break;
end;
end;
finally
UnLock();
end;
end;
FRunging := FALSE;
end;

procedure TsfTimerWheel.Lock;
begin
EnterCriticalSection(FCS);
end;

(* 使用方法演示
procedure TsfTimerWheel.OnTimer(AFirstNode: PWTimeNode);
var
PNode:PWTimeNode;
Msg:string;
begin
PNode := AFirstNode;
//doSomething
while(TRUE) do
begin
if PNode = nil then Break;
//doSomething
//Msg := formatDateTime(‘YYYY-MM-DD hh:mm:ss.zzz‘,PDateTime(PNode^.UserData)^) + ‘;Tag=‘ + IntToStr(PNode^.Tag) +
‘;Slot=‘ + IntToStr(PNode^.Slot);
//SendMessage(frmMain.Handle,WM_WRITE_LOG,WParam(@Msg),0);
//\\
PNode := PNode^.Next;//找到下个节点
end;
end;
*)

procedure TsfTimerWheel.UnLock;
begin
LeaveCriticalSection(FCS);
end;

end.

// 队列的实现 2011-06-26
unit sfContnrs;

interface

{.$DEFINE MULTI_THREAD_QUEUE} // 线程安全版本

{.$IFDEF MULTI_THREAD_QUEUE}
uses
Windows,classes,SysUtils;//,sfString;
{.$ENDIF}

type
//2014-03-01 为了64位系统做了兼容
TsfQueue = class
private
FCapacity: Integer;
FTmpBuff: Pointer;
FBuff: Pointer;
FMaxLength:DWORD;
FThreadSafe:Boolean;
{$IFDEF MULTI_THREAD_QUEUE}
FCS: TRTLCriticalSection;
{$ENDIF}
// \\
FPushIndex: Integer;
FPopIndex: Integer;

procedure Inernal_SetCapacity(const Value: Integer);
// \\
procedure setCapacity(const Value: Integer);
function getCapacity: Integer;
function getEmpty: Boolean;
function getLength: Integer;
public //2015-07-12 protected==>public
procedure Lock();
procedure UnLock();
public
//pvMaxLength :队列允许的最大长度,0 不限制
constructor Create(InitCapacity: Integer = 1024;pvMaxLength:DWORD=0);
constructor CreateNoThreadSafe(InitCapacity:Integer=1024;pvMaxLength:DWORD=0);
destructor Destroy(); override;
// \\
function Push(AItem: Pointer): Pointer; virtual;
function Pop(): Pointer; virtual;
procedure Clear();virtual;
public
property ThreadSefe:Boolean read FThreadSafe;// write FThreadSafe;
property Capacity: Integer read getCapacity write setCapacity;
property Empty: Boolean read getEmpty;
property Length: Integer read getLength;
property MaxLength:DWORD read FMaxLength;
end;

//为了兼容64编译(2014-03-01 17:35)
TIntegerQueue = class
private
FCapacity: Integer;
FTmpBuff: Pointer;
FBuff: Pointer;
FMaxLength:DWORD;
{$IFDEF MULTI_THREAD_QUEUE}
FCS: TRTLCriticalSection;
{$ENDIF}
FPushIndex: Integer;
FPopIndex: Integer;

procedure Inernal_SetCapacity(const Value: Integer);
// \\
procedure setCapacity(const Value: Integer);
function getCapacity: Integer;
function getEmpty: Boolean;
function getLength: Integer;
protected
procedure Lock();
procedure UnLock();
public
//pvMaxLength :队列允许的最大长度,0 不限制
constructor Create(InitCapacity: Integer = 1024;pvMaxLength:DWORD=0);
destructor Destroy(); override;
// \\
function Push(AItem: Integer): Integer;
function Pop(): Integer; virtual;
procedure Clear();virtual;
public
property Capacity: Integer read getCapacity write setCapacity;
property Empty: Boolean read getEmpty;
property Length: Integer read getLength;
property MaxLength:DWORD read FMaxLength;
end;

//哈希表实现 2015-02-05 10:43
TArrayOfPointer = array of Pointer;
PPsfHashItem = ^PsfHashItem;
PsfHashItem = ^TsfHashItem;

TsfHashItem = record
Next: PsfHashItem;
Key: Int64;
Value: Pointer; // TCacheMemoryBlock;
Used: Integer;
Index: Integer;
Hash: Integer;
R: Integer; // 对齐用
end;

TEventOnDisposeBucket = procedure(Sender: TObject; Value: PsfHashItem) of object;

{
//First in First out(FIFO)
我是先进先出,我是一个低负载的算法,并且对缓存对象的管理要求不高。
我通过一个队列去跟踪所有的缓存对象,最近最常用的缓存对象放在后面,
而更早的缓存对象放在前面,当缓存容量满时,排在前面的缓存对象会被踢走,
然后把新的缓存对象加进去。我很快,但是我并不适用。
}
THashFIFO = class //First in First out(FIFO)
private
FBucketStacks: array of PsfHashItem;
FStackIndex: Integer;
FStatckTop: Integer; // 栈顶位置
FStackBottom: Integer; // 栈底位置
FCurPopIndex: Integer;
FOnDisposeBucket: TEventOnDisposeBucket;
function Push(Value: PsfHashItem): Integer;
function Pop(): PsfHashItem;
function GetFreeCount: Integer; // 空间不足时,删除最早的数据
function Bind(const Key: Int64): Pointer;
protected
Buckets: array of PsfHashItem;
BucketPool: array of PsfHashItem;
function Find(const Key: Int64): PPsfHashItem;
function HashOf(const Key: Int64): Cardinal; virtual;
procedure DisposeBucket(Value: PsfHashItem); virtual;
procedure Clear;
procedure Remove(const Key: Int64); overload;
procedure Remove(const Hash: Integer; const Key: Int64); overload;
function Modify(const Key: Int64; Value: Pointer): Boolean;
public
constructor Create(Size: Cardinal; pvInitValue: TArrayOfPointer = nil);
destructor Destroy; override;
function NewBucket(): PsfHashItem;
function Add(const Key: Int64; Value: Pointer): Integer;
function ValueOf(const Key: Int64): Pointer;
public
property FreeCount: Integer read GetFreeCount; // 空闲的空间个数
property OnDisposeBucket: TEventOnDisposeBucket read FOnDisposeBucket
write FOnDisposeBucket;
end;

 

//2015-02-06 13:51
//环形数组,使用时需要设置最大数组元素个数
//在使用中,如果超出了最大容量,那么最早压入
//的数据被移除,然后放入新压入的数据
//OnElementPush 在数据压入前,触发,可以有机会去
//处理要删除掉的元素数据
TElementFreeEvent = procedure(Sender:TObject;const OldValue:NativeInt;
var NewValue:NativeInt) of object;

TCircleArrayOfNativeInt=class
private
FOverWrap:Boolean;
FCS:TRTLCriticalSection;
FDataArray:array of NativeInt;
FWritePosition:Integer;
FReadPosition:Integer;
FDataCount:Integer;
FDataSize:Integer;
FOnElementFree:TElementFreeEvent;
FThreadSafe:Boolean;
FDirty:Boolean;
function getDataCount: Integer;
protected
procedure OnElementPush(const OldValue:NativeInt;var NewValue:NativeInt);virtual;
public
FID:array[1..32] of Ansichar;//数据所有者标示符;
constructor Create(Size:Integer;AThreadSafe:Boolean);
destructor Destroy();override;
procedure Lock();
procedure UnLock();
function Push(Value:NativeInt):Integer;
function Pop(var Value:NativeInt):Boolean;
procedure SaveToFile(const AFileName:string);
function SaveToStream(AStream:TStream):Integer;
class function LoadfromFile(const AFileName:string):TCircleArrayOfNativeInt;
class function LoadFromStream(AStream:TStream):TCircleArrayOfNativeInt;
public
property Dirty:Boolean read FDirty;
property ThreadSafe:Boolean read FThreadSafe;
property DataCount:Integer read getDataCount;
property OverWrap:Boolean read FOverWrap;
property OnElementFree:TElementFreeEvent read FOnElementFree write FOnElementFree;
end;

TCircleArrayHeader=record
FileSize : Integer;
DataSize : Integer;//数组容量
DataCount : Integer;
WritePosition : Integer;
ReadPosition : Integer;
WriteDate:TDateTime;
OverWrap:Boolean;
ThreadSafe:Boolean;
ID:array[1..32] of Ansichar;//数据所有者标示符
R:array[1..2] of Ansichar;//对齐用
end;

PFixQueueNode=^TFixQueueNode;
TFixQueueNode=record
Data:NativeInt;
Status:Integer; //0:空闲; 1:有数据
Next:PFixQueueNode;
end;

//定长的队列(线程安全) 2015-07-24 09:30
TFixThreadQueue=class
private
FCS:TRTLCriticalSection;
FSize:Integer;//队列容量
FDataCount:Integer;//队列中存在的数据个数
FPushNode:PFixQueueNode;
FPopNode:PFixQueueNode;
FMemPtr:Pointer;
function GetDataCount: Integer;
procedure InitQueue(QueueSize:Integer);//初始化队列
public
constructor Create(ASize:Integer=256);//256 默认可以存储256个元素
destructor Destroy();override;
function Push(Data:NativeInt;NeedLock:Boolean=TRUE):Boolean;
function Pop(var Data:NativeInt;NeedLock:Boolean=TRUE):Boolean;
function PickUp(Data:NativeInt;NeedLock:Boolean=TRUE):Boolean;//与Pop 操作类似,但不移动指针 2017/05/01
procedure Lock();
procedure UnLock();
public
property Size:Integer read FSize;//队列容量
property MemPtr:Pointer read FMemPtr;
property DataCount:Integer read GetDataCount; //队列中存在的数据个数
end;


//定时器队列;线程安全;(2016/08/26 16:23)
//2017/06/19 14:38 针对64位做了调整
PWTimeNode=^TWTimeNode;
TWTimeNode=record
Slot:Integer; //内部
Index:Integer; //内部
TimeOutSec:DWORD;//定时秒数
Tag:Integer; //用户使用
UserData:Pointer;//用户使用
Next:PWTimeNode; //内部
end;

TsfTimerWheel=class(TThread) //2016/08/26 15:36 定时精度 1秒
private
FClockPosition:Integer;
FMaxSecCount:DWORD;
FCS:TRTLCriticalSection;//TCriticalSection;
FRunging:Boolean;
protected
FTimeNodes:array of TWTimeNode; //array[0..AMaxNodeCount - 1] of TWTimeNode
procedure Execute();override;

//当前定时到期的节点链表(AFirstNode 链表头)
//回调完成后,内部会等待 AWorkDone = true 才继续运行;
//此参数的目的是在 OnTimer中,可以执行异步操作完成后,设置AWorkDone=true //2017/06/19 添加
procedure OnTimer(AFirstNode:PWTimeNode;var AWorkDone:Boolean);virtual;abstract; //使用方法演示,见当前文件尾部
public
//AMaxNodeCount 最大处理的定时数据个数;
//AMaxSecCount 最大定时秒数;定时精度 1秒
constructor Create(AMaxSecCount:DWORD;AMaxNodeCount:DWORD);
destructor Destroy();override;
//\\
//>=0 成功返回 内部索引(删除定时器用)
//-1 队列满
//-2 超过;定时上限(秒)----MaxSecs
function AddTimer(UserData:Pointer;Tag:Integer;ATimeOutSec:DWORD):Integer;
function DeleteTimer(AIndex:Integer):Boolean;
//\\
procedure Lock();
procedure UnLock();
public
property Runging:Boolean read FRunging;
property MaxSecs:DWORD read FMaxSecCount;//定时上限(秒)
end;

 

implementation

type
TInnerMemoryStream=class(TMemoryStream)
public
procedure SaveToStream(Stream: TStream); override;
end;

{ TsfQueue }

procedure TsfQueue.Clear;
begin
Lock();
FPopIndex := 0;
FPushIndex := 0;
UnLock();
end;

constructor TsfQueue.Create(InitCapacity: Integer;pvMaxLength:DWORD);
begin
FThreadSafe := TRUE;
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
InitializeCriticalSection(FCS);
{$ENDIF}
if InitCapacity < 1024 then
InitCapacity := 1024;

Inernal_SetCapacity(InitCapacity);

FMaxLength := pvMaxLength;

end;

constructor TsfQueue.CreateNoThreadSafe(InitCapacity: Integer;
pvMaxLength: DWORD);
begin
FThreadSafe := FALSE;
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
InitializeCriticalSection(FCS);
{$ENDIF}
if InitCapacity < 1024 then
InitCapacity := 1024;

Inernal_SetCapacity(InitCapacity);

FMaxLength := pvMaxLength;
end;

destructor TsfQueue.Destroy;
begin
FreeMem(FBuff);
if FTmpBuff <> nil then
FreeMem(FTmpBuff);
// \\
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
begin
DeleteCriticalSection(FCS);
end;
{$ENDIF}
inherited;
end;

procedure TsfQueue.Lock;
begin
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
begin
EnterCriticalSection(FCS);
end;
{$ENDIF}
end;

procedure TsfQueue.UnLock;
begin
{$IFDEF MULTI_THREAD_QUEUE}
if ThreadSefe then
begin
LeaveCriticalSection(FCS);
end;
{$ENDIF}
end;

procedure TsfQueue.Inernal_SetCapacity(const Value: Integer);
var
PageCount, ASize: Integer;
begin
if Value > FCapacity then
begin
if FTmpBuff <> nil then
FreeMem(FTmpBuff);

// 扩容
ASize := Value * SizeOf(Pointer); // 计算出所需要的字节数量
PageCount := ASize div 4096;
if (ASize mod 4096) > 0 then
Inc(PageCount);

// 转移数据
GetMem(FTmpBuff, PageCount * 4096);
FillChar(FTmpBuff^, PageCount * 4096, #0);

if FBuff <> nil then
begin
Move(FBuff^, FTmpBuff^, FCapacity * SizeOf(Pointer));
FreeMem(FBuff);
end;

FBuff := FTmpBuff;

// 计算新的容量
FCapacity := (PageCount * 4096) div SizeOf(Pointer);

if FCapacity >= 2048 then
begin
// FTmpBuff 分配用于Pop时候,移动内存用
GetMem(FTmpBuff, PageCount * 4096);
end
else
FTmpBuff := nil;
end;
end;

function TsfQueue.Pop: Pointer;
procedure AdjuestMem();
var
pSrc: PInteger;
pTmp: Pointer;
begin
FillChar(FTmpBuff^, FCapacity * SizeOf(Pointer), #0);
pSrc := PInteger(FBuff);
Inc(pSrc, FPopIndex);
Move(pSrc^, FTmpBuff^, (FCapacity - FPopIndex) * SizeOf(Pointer));
// \\
// 交换指针
pTmp := FBuff;
FBuff := FTmpBuff;
FTmpBuff := pTmp;
// \\
end;

const
_MoveRange_ = 2048;

var
P: PInteger;
begin
Lock();
try
Result := nil;
if (FPopIndex = FPushIndex) then
Exit;
P := PInteger(FBuff);
Inc(P, FPopIndex);
Result := Pointer(P^);
Inc(FPopIndex);
// 队列底部空余内存达到 8192 整体搬迁
if FPopIndex = _MoveRange_ then
begin
AdjuestMem();
FPopIndex := 0;
Dec(FPushIndex, _MoveRange_);
end;
finally
UnLock();
end;
end;

function TsfQueue.Push(AItem: Pointer): Pointer;
var
P: PInteger;
Len:Integer;
begin
Lock();
try
Result := nil;
Len := Self.FPushIndex - Self.FPopIndex;//队列当前长度
if (FMaxLength > 0) and (Len >= FMaxLength) then //超出队列长度
begin
Exit;
end;
P := PInteger(FBuff);
Inc(P, FPushIndex);
P^ := Integer(AItem);
Inc(FPushIndex);
if FPushIndex >= FCapacity then
begin
// 扩容加 1024 个位置
Inernal_SetCapacity(FCapacity + 1024);
end;
Result := AItem;
finally
UnLock();
end;
end;

procedure TsfQueue.setCapacity(const Value: Integer);
begin
Lock();
try
Inernal_SetCapacity(Value);
finally
UnLock();
end;
end;

function TsfQueue.getCapacity: Integer;
begin
Lock();
try
Result := Self.FCapacity;
finally
UnLock();
end;
end;

function TsfQueue.getEmpty: Boolean;
begin
Lock();
Result := (FPushIndex = FPopIndex);
UnLock();
end;

function TsfQueue.getLength: Integer;
begin
Lock();
Result := Self.FPushIndex - Self.FPopIndex;
UnLock();
end;

{ TIntegerQueue }

procedure TIntegerQueue.Clear;
begin
Lock();
FPopIndex := 0;
FPushIndex := 0;
UnLock();
end;

constructor TIntegerQueue.Create(InitCapacity: Integer;pvMaxLength:DWORD);
begin
{$IFDEF MULTI_THREAD_QUEUE}
InitializeCriticalSection(FCS);
{$ENDIF}
if InitCapacity < 1024 then
InitCapacity := 1024;

Inernal_SetCapacity(InitCapacity);

FMaxLength := pvMaxLength;

end;

destructor TIntegerQueue.Destroy;
begin
FreeMem(FBuff);
if FTmpBuff <> nil then
FreeMem(FTmpBuff);
// \\
{$IFDEF MULTI_THREAD_QUEUE}
DeleteCriticalSection(FCS);
{$ENDIF}
inherited;
end;

procedure TIntegerQueue.Lock;
begin
{$IFDEF MULTI_THREAD_QUEUE}
EnterCriticalSection(FCS);
{$ENDIF}
end;

procedure TIntegerQueue.UnLock;
begin
{$IFDEF MULTI_THREAD_QUEUE}
LeaveCriticalSection(FCS);
{$ENDIF}
end;

procedure TIntegerQueue.Inernal_SetCapacity(const Value: Integer);
var
PageCount, ASize: Integer;
begin
if Value > FCapacity then
begin
if FTmpBuff <> nil then
FreeMem(FTmpBuff);

// 扩容
// ASize := Value * 4; // 计算出所需要的字节数量
ASize := Value * SizeOf(Pointer); // 计算出所需要的字节数量

PageCount := ASize div 4096;
if (ASize mod 4096) > 0 then
Inc(PageCount);

// 转移数据
GetMem(FTmpBuff, PageCount * 4096);
FillChar(FTmpBuff^, PageCount * 4096, #0);

if FBuff <> nil then
begin
Move(FBuff^, FTmpBuff^, FCapacity * SizeOf(Pointer));
FreeMem(FBuff);
end;

FBuff := FTmpBuff;

// 计算新的容量
FCapacity := (PageCount * 4096) div SizeOf(Pointer);

if FCapacity >= 2048 then
begin
// FTmpBuff 分配用于Pop时候,移动内存用
GetMem(FTmpBuff, PageCount * 4096);
end
else
FTmpBuff := nil;
end;
end;

function TIntegerQueue.Pop: Integer;
procedure AdjuestMem();
var
pSrc: PInteger;
pTmp: Pointer;
begin
FillChar(FTmpBuff^, FCapacity * SizeOf(Pointer), #0);
pSrc := PInteger(FBuff);
Inc(pSrc, FPopIndex);
Move(pSrc^, FTmpBuff^, (FCapacity - FPopIndex) * SizeOf(Pointer));
// \\
// 交换指针
pTmp := FBuff;
FBuff := FTmpBuff;
FTmpBuff := pTmp;
// \\
end;

const
_MoveRange_ = 2048;

var
P: PInteger;
begin
Lock();
try
Result := 0;
if (FPopIndex = FPushIndex) then
Exit;
P := PInteger(FBuff);
Inc(P, FPopIndex);
Result := P^;
Inc(FPopIndex);
// 队列底部空余内存达到 8192 整体搬迁
if FPopIndex = _MoveRange_ then
begin
AdjuestMem();
FPopIndex := 0;
Dec(FPushIndex, _MoveRange_);
end;
finally
UnLock();
end;
end;

function TIntegerQueue.Push(AItem: Integer): Integer;
var
P: PInteger;
Len:Integer;
begin
Lock();
try
Result := 0;
Len := Self.FPushIndex - Self.FPopIndex;//队列当前长度
if (FMaxLength > 0) and (Len >= FMaxLength) then //超出队列长度
begin
Exit;
end;
P := PInteger(FBuff);
Inc(P, FPushIndex);
P^ := Integer(AItem);
Inc(FPushIndex);
if FPushIndex >= FCapacity then
begin
// 扩容加 1024 个位置
Inernal_SetCapacity(FCapacity + 1024);
end;
Result := AItem;
finally
UnLock();
end;
end;

procedure TIntegerQueue.setCapacity(const Value: Integer);
begin
Lock();
try
Inernal_SetCapacity(Value);
finally
UnLock();
end;
end;

function TIntegerQueue.getCapacity: Integer;
begin
Lock();
try
Result := Self.FCapacity;
finally
UnLock();
end;
end;

function TIntegerQueue.getEmpty: Boolean;
begin
Lock();
Result := (FPushIndex = FPopIndex);
UnLock();
end;

function TIntegerQueue.getLength: Integer;
begin
Lock();
Result := Self.FPushIndex - Self.FPopIndex;
UnLock();
end;

 

{ THashFIFO }

function THashFIFO.Add(const Key: Int64; Value: Pointer): Integer;
var
Hash: Integer;
Bucket: PsfHashItem;
begin
Bucket := NewBucket();
if Bucket <> nil then
begin
Hash := HashOf(Key) mod Cardinal(Length(Buckets));
Bucket^.Key := Key;
Bucket^.Value := Value;
Bucket^.Next := Buckets[Hash];
Buckets[Hash] := Bucket;
Result := Hash;
Bucket^.Hash := Hash;
Result := Bucket^.Index;
end
else
Result := -1; // 空间满
end;

function THashFIFO.Bind(const Key: Int64): Pointer;
var
Hash: Integer;
Bucket: PsfHashItem;
begin
Bucket := NewBucket();
if Bucket <> nil then
begin
Hash := HashOf(Key) mod Cardinal(Length(Buckets));
Bucket^.Key := Key;
Bucket^.Next := Buckets[Hash];
Bucket^.Hash := Hash;
Buckets[Hash] := Bucket;
Result := Bucket^.Value;
end
else
Result := nil;
end;

procedure THashFIFO.Clear;
var
I: Integer;
P, N: PsfHashItem;
begin
for I := 0 to Length(Buckets) - 1 do
begin
P := Buckets[I];
while P <> nil do
begin
N := P^.Next;
// Dispose(P);
DisposeBucket(P);
P := N;
end;
Buckets[I] := nil;
end;
end;

constructor THashFIFO.Create(Size: Cardinal; pvInitValue: TArrayOfPointer);
var
Index: Integer;
PH: PsfHashItem;
begin
inherited Create;
SetLength(Buckets, Size);
for Index := Low(Buckets) to High(Buckets) do
Buckets[Index] := nil;

SetLength(FBucketStacks, Size);
for Index := Low(FBucketStacks) to High(FBucketStacks) do
FBucketStacks[Index] := nil;

FStackBottom := Low(Buckets);
FStatckTop := High(Buckets);
FStackIndex := FStackBottom;
Dec(FStackIndex);
// \\
SetLength(BucketPool, Size); // :array of PSocketHashItem;
for Index := Low(BucketPool) to High(BucketPool) do
begin
New(PH);
PH^.Next := nil;
PH^.Key := 0;
if (pvInitValue <> nil) then
PH^.Value := pvInitValue[Index]
else
PH^.Value := nil;
PH^.Index := Index;
BucketPool[Index] := PH;
Push(PH);
end;
// \\
FCurPopIndex := -1;
end;

destructor THashFIFO.Destroy;
var
Index: Integer;
P: PsfHashItem;
begin
Clear;
for Index := Low(BucketPool) to High(BucketPool) do
begin
P := BucketPool[Index];
if P <> nil then
Dispose(P);
end;
// FCS.Free();
inherited Destroy;
end;

procedure THashFIFO.DisposeBucket(Value: PsfHashItem);
begin
if Assigned(OnDisposeBucket) then
OnDisposeBucket(Self, Value);
Value^.Used := $00; // 空闲标记
Push(Value);
end;

function THashFIFO.Find(const Key: Int64): PPsfHashItem;
var
Hash: Integer;
begin
Hash := HashOf(Key) mod Cardinal(Length(Buckets));
Result := @Buckets[Hash];
while Result^ <> nil do
begin
if Result^.Key = Key then
Exit
else
Result := @Result^.Next;
end;
end;

function THashFIFO.GetFreeCount: Integer;
begin
if FStackIndex >= FStackBottom then
Result := (FStackIndex - FStackBottom) + 1
else
Result := 0;
end;

function THashFIFO.HashOf(const Key: Int64): Cardinal;
var
I: Integer;
P: PByte;
begin
Result := 0;
P := @Key;
for I := 1 to SizeOf(Key) do
begin
Result := ((Result shl 2) or (Result shr (SizeOf(Result) * 8 - 2))) xor P^;
Inc(P);
end;
end;

function THashFIFO.Modify(const Key: Int64; Value: Pointer): Boolean;
var
P: PsfHashItem;
begin
P := Find(Key)^;
if P <> nil then
begin
Result := TRUE;
P^.Value := Value;
end
else
Result := FALSE;
end;

function THashFIFO.NewBucket: PsfHashItem;
begin
Result := Self.Pop();
if Result <> nil then
Result^.Used := $01;
end;

function THashFIFO.Pop: PsfHashItem;
label Lable_BEGIN;
begin
Lable_BEGIN:
if FStackIndex >= FStackBottom then
begin
Result := PsfHashItem(FBucketStacks[FStackIndex]);
Dec(FStackIndex);
// \\
if FCurPopIndex < 0 then
FCurPopIndex := Result.Index;
end
else
begin
Result := BucketPool[FCurPopIndex];
if Result <> nil then
Remove(Result^.Hash, Result^.Key);
Dec(FCurPopIndex);
if FCurPopIndex < 0 then
FCurPopIndex := High(FBucketStacks);
goto Lable_BEGIN;
end;
end;

function THashFIFO.Push(Value: PsfHashItem): Integer;
begin
if FStackIndex < FStatckTop then
begin
Inc(FStackIndex);
FBucketStacks[FStackIndex] := Value;
Result := FStackIndex;
end
else
Result := -1;
end;

procedure THashFIFO.Remove(const Hash: Integer; const Key: Int64);

function InnerFind(): PPsfHashItem;
// var
// Hash: Integer;
begin
// Hash := HashOf(Key) mod Cardinal(Length(Buckets));
Result := @Buckets[Hash];
while Result^ <> nil do
begin
if Result^.Key = Key then
Exit
else
Result := @Result^.Next;
end;
end;

var
P: PsfHashItem;
Prev: PPsfHashItem;
begin
Prev := InnerFind();
P := Prev^;
if P <> nil then
begin
Prev^ := P^.Next;
DisposeBucket(P);
end;
end;

procedure THashFIFO.Remove(const Key: Int64);
var
P: PsfHashItem;
Prev: PPsfHashItem;
begin
Prev := Find(Key);
P := Prev^;
if P <> nil then
begin
Prev^ := P^.Next;
DisposeBucket(P);
end;
end;

function THashFIFO.ValueOf(const Key: Int64): Pointer;
var
P: PsfHashItem;
begin
P := Find(Key)^;
if P <> nil then
Result := P^.Value
else
Result := nil; // -1;
end;

 

{ TCircleArrayOfNativeInt }

constructor TCircleArrayOfNativeInt.Create(Size: Integer;AThreadSafe:Boolean);
begin
SetLength(FDataArray,Size);
FWritePosition := Low(FDataArray);
FReadPosition := Low(FDataArray);
FDataSize := Size;
FOverWrap := FALSE;
FThreadSafe := AThreadSafe;
if ThreadSafe then
begin
InitializeCriticalSection(FCS);
end;
FDirty := FALSE;
end;

destructor TCircleArrayOfNativeInt.Destroy;
begin
if ThreadSafe then
begin
DeleteCriticalSection(FCS);
end;
inherited;
end;

function TCircleArrayOfNativeInt.getDataCount: Integer;
begin
Lock();
Result := FDataCount;
UnLock();
end;

class function TCircleArrayOfNativeInt.LoadfromFile(const AFileName: string):TCircleArrayOfNativeInt;
var
FS:TFileStream;
begin
if FileExists(AFileName) then
begin
FS := TFileStream.Create(AFileName,fmOpenRead);
try
Result := LoadFromStream(FS);
finally
FS.Free();
end;
end
else Result := nil;
end;

class function TCircleArrayOfNativeInt.LoadFromStream(AStream: TStream):TCircleArrayOfNativeInt;
var
H:TCircleArrayHeader;
iSize,Index:Integer;
CH:Ansichar;
Value:NativeInt;
begin
AStream.Read(CH,1);
Windows.ZeroMemory(@H,SizeOf(H));
AStream.Read(H,SizeOf(H));
//\\
Result := TCircleArrayOfNativeInt.Create(H.DataSize,H.ThreadSafe);
Result.FOverWrap := H.OverWrap;
Result.FDataCount := H.DataCount;
Result.FWritePosition := H.WritePosition;
Result.FReadPosition := H.ReadPosition;
for Index := Low(Result.FDataArray) to High(Result.FDataArray) do
begin
AStream.Read(Value,SizeOf(Value));
if Value <= 0 then
Break;
Result.FDataArray[Index] := Value;
end;
end;

procedure TCircleArrayOfNativeInt.Lock;
begin
if ThreadSafe then
EnterCriticalSection(FCS);
end;

procedure TCircleArrayOfNativeInt.OnElementPush(const OldValue:NativeInt;
var NewValue:NativeInt);
begin
if Assigned(OnElementFree) then
begin
OnElementFree(Self,OldValue,NewValue);
end;
end;

function TCircleArrayOfNativeInt.Pop(var Value: NativeInt):Boolean;
begin
FDirty := TRUE;
Lock();
try
if FDataCount = 0 then
begin
Result := FALSE;
FReadPosition := 0;
FWritePosition := 0;
FOverWrap := FALSE;
Exit;
end;
if FReadPosition > High(FDataArray) then
begin
FReadPosition := Low(FDataArray);
end;
Value := FDataArray[FReadPosition];
Inc(FReadPosition);
Dec(FDataCount);
Result := TRUE;
finally
UnLock();
end;
end;

function TCircleArrayOfNativeInt.Push(Value: NativeInt): Integer;
begin
FDirty := TRUE;
Lock();
try
if FWritePosition > High(FDataArray) then
begin
FOverWrap := TRUE;
FWritePosition := Low(FDataArray);
OnElementPush(FDataArray[FWritePosition],Value);
FDataArray[FWritePosition] := Value;
if FReadPosition = FWritePosition then
begin
FReadPosition := FWritePosition + 1;
end;
FWritePosition := FWritePosition + 1;
end
else begin
OnElementPush(FDataArray[FWritePosition],Value);
FDataArray[FWritePosition] := Value;
FWritePosition := FWritePosition + 1;
if FDataCount < FDataSize then
begin
Inc(FDataCount);
end;
if FOverWrap then
begin
if FWritePosition >= FReadPosition then
begin
FReadPosition := FWritePosition;
if FReadPosition > High(FDataArray) then
FReadPosition := 0;
end;
end;
end;
finally
UnLock();
end;
end;

procedure TCircleArrayOfNativeInt.SaveToFile(const AFileName: string);
var
MS:TInnerMemoryStream;
begin
MS := TInnerMemoryStream.Create();
MS.Size := 1024 * 256;
MS.Position := 0;
Windows.ZeroMemory(MS.Memory,MS.Size);
SaveToStream(MS);
MS.SaveToFile(AFileName);
MS.Free();
end;

function TCircleArrayOfNativeInt.SaveToStream(AStream: TStream): Integer;
const
CH:Byte = $42;
var
H:TCircleArrayHeader;
iSize,Index:Integer;
P:PAnsichar;
begin
Lock();
try
AStream.Write(CH,1);
Windows.ZeroMemory(@H,SizeOf(H));
H.R[1] := ‘R‘;
H.R[2] := ‘R‘;
H.DataSize := FDataSize;
H.DataCount := DataCount;
H.WritePosition := FWritePosition;
H.ReadPosition := FReadPosition;
H.OverWrap := FOverWrap;
H.WriteDate := Now();
H.ThreadSafe := ThreadSafe;
StrLCopy(PAnsichar(@H.ID),
PAnsichar(@FID),
SizeOf(H.ID) - 1);
AStream.Write(H,SizeOf(H));
P := @FDataArray[0];
iSize := FDataSize * SizeOf(NativeInt);
H.FileSize := 1 + SizeOf(H) + iSize;
AStream.Write(H,SizeOf(H));
AStream.Write(P^,iSize);
finally
UnLock();
end;
FDirty := FALSE;
end;

procedure TCircleArrayOfNativeInt.UnLock;
begin
if ThreadSafe then
LeaveCriticalSection(FCS);
end;

(* 测试代码
procedure TForm1.Button1Click(Sender: TObject);
var
obj:TCircleArrayOfNativeInt;
Index,I:NativeInt;
begin
Memo1.Clear();

Obj := TCircleArrayOfNativeInt.Create(2,TRUE);

for Index := 1000 to 1000 + 88 do
begin
if Index = 1000 + 62 then
obj.Push(Index)
else
Obj.Push(Index);
end;

I := 0;
while(TRUE) do
begin
if not Obj.Pop(Index) then Break;
//if Index < 0 then break;
Memo1.Lines.Add(IntToStr(Index));
Inc(I);
if I > 3 then
Break;
end;
//Exit;
Memo1.Lines.Add(‘**********************‘);

Obj.Push(91011);
Obj.Push(91012);
Obj.Push(91013);
Obj.Push(91014);
//Obj.Push(91015);
//Obj.Push(91016);
//Obj.Push(91017);
while(TRUE) do
begin
if not Obj.Pop(Index) then Break;
Memo1.Lines.Add(IntToStr(Index));
end;

end;
*)


{ TInnerMemoryStream }

procedure TInnerMemoryStream.SaveToStream(Stream: TStream);
begin
if Position <> 0 then Stream.WriteBuffer(Memory^, Position);
end;

{ TFixThreadQueue }

constructor TFixThreadQueue.Create(ASize: Integer);
begin
if ASize <= 0 then ASize := 1;
InitializeCriticalSectionAndSpinCount(FCS,4000);
InitQueue(ASize);
Self.FSize := ASize;
end;

destructor TFixThreadQueue.Destroy;
begin
FreeMem(FMemPtr);
DeleteCriticalSection(FCS);
inherited;
end;

function TFixThreadQueue.GetDataCount: Integer;
begin
Windows.InterlockedExchange(Result,FDataCount);
end;

procedure TFixThreadQueue.InitQueue(QueueSize: Integer);
var
ASize:Integer;
P,PNext:PFixQueueNode;
Index:Integer;
begin
ASize := QueueSize * SizeOf(TFixQueueNode);
GetMem(FMemPtr,ASize);
ZeroMemory(FMemPtr,ASize);
//\\
P := FMemPtr;
PNext := P;
Inc(PNext);
for Index := 1 to QueueSize - 1 do
begin
P^.Next := PNext;
Inc(P);
Inc(PNext);
end;
P^.Next := FMemPtr;
//\\
FPushNode := FMemPtr;
FPopNode := FMemPtr;
end;

procedure TFixThreadQueue.Lock;
begin
EnterCriticalSection(FCS);
end;

function TFixThreadQueue.PickUp(Data: NativeInt;NeedLock:Boolean): Boolean;
begin
Result := FALSE;
if NeedLock then Lock();
if FPopNode^.Status = $01 then
begin
Data := FPopNode^.Data;
//FPopNode^.Status := $00;
//FPopNode := FPopNode^.Next;
Result := TRUE;
//InterlockedDecrement(FDataCount);
end;
if NeedLock then UnLock();
end;

function TFixThreadQueue.Pop(var Data: NativeInt;NeedLock:Boolean): Boolean;
begin
Result := FALSE;
if NeedLock then Lock();
if FPopNode^.Status = $01 then
begin
Data := FPopNode^.Data;
FPopNode^.Status := $00;
FPopNode := FPopNode^.Next;
Result := TRUE;
InterlockedDecrement(FDataCount);
end;
if NeedLock then UnLock();
end;

function TFixThreadQueue.Push(Data: NativeInt;NeedLock:Boolean): Boolean;
begin
Result := FALSE;
if NeedLock then Lock();
if FPushNode^.Status = $00 then
begin
FPushNode^.Data := Data;
FPushNode^.Status := $01;
FPushNode := FPushNode^.Next;
Result := TRUE;
InterlockedIncrement(FDataCount);
end;
if NeedLock then UnLock();
end;

procedure TFixThreadQueue.UnLock;
begin
LeaveCriticalSection(FCS);
end;

{ TsfTimerWheel }

function TsfTimerWheel.AddTimer(UserData: Pointer; Tag: Integer;
ATimeOutSec: DWORD): Integer;
var
Index:Integer;
begin
Result :=-1;
if ATimeOutSec > FMaxSecCount then
begin
Result := - 2;
Exit;
end;
//\\
Lock();
try
for Index := Low(FTimeNodes) to High(FTimeNodes) do
begin
if FTimeNodes[Index].Slot < 0 then
begin
FTimeNodes[Index].Slot := (ATimeOutSec + FClockPosition) mod FMaxSecCount;
FTimeNodes[Index].UserData := UserData;
FTimeNodes[Index].Tag := Tag;
FTimeNodes[Index].Index := Index;
FTimeNodes[Index].TimeOutSec := ATimeOutSec;
Result := Index;
Break;
end;
end;
finally
UnLock();
end;
end;

constructor TsfTimerWheel.Create(AMaxSecCount, AMaxNodeCount: DWORD);
var
Index:Integer;
begin
inherited Create(TRUE);
FRunging := FALSE;
FMaxSecCount := AMaxSecCount;
SetLength(FTimeNodes,AMaxNodeCount);
for Index := Low(FTimeNodes) to High(FTimeNodes) do
begin
FTimeNodes[Index].Slot := -1;
FTimeNodes[Index].Next := nil;
FTimeNodes[Index].UserData := nil;
FTimeNodes[Index].Tag := 0;
end;
InitializeCriticalSection(FCS);
FClockPosition := 0;
Resume();
end;

function TsfTimerWheel.DeleteTimer(AIndex: Integer): Boolean;
begin
Lock();
try
if (AIndex >= 0) and (AIndex - 1 <= High(FTimeNodes)) then
begin
FTimeNodes[AIndex].Slot := -1;
FTimeNodes[AIndex].Next := nil;
FTimeNodes[AIndex].Index := -1;
FTimeNodes[AIndex].TimeOutSec := 0;
FTimeNodes[AIndex].UserData := nil;
FTimeNodes[AIndex].Tag := 0;
Result := TRUE;
end
else Result := FALSE;
finally
UnLock();
end;
end;

destructor TsfTimerWheel.Destroy;
begin
DeleteCriticalSection(FCS);
inherited;
end;

procedure TsfTimerWheel.Execute;
var
Index:Integer;
ANode,FirstNode,TmpNode:PWTimeNode;
AWorkDone:Boolean;
begin
FRunging := TRUE;
while(not Terminated) do
begin
//Sleep(1000);
WaitForSingleObject(Handle,1000);
if Terminated then Break;
//if iRet = WAIT_TIMEOUT then
Lock();
try
Inc(FClockPosition);
FClockPosition := FClockPosition mod FMaxSecCount;
ANode := nil;
FirstNode := nil;
for Index := Low(FTimeNodes) to High(FTimeNodes) do
begin
if FTimeNodes[Index].Slot = FClockPosition then
begin
if ANode = nil then
begin
ANode := @FTimeNodes[Index];
ANode.Next := nil;
FirstNode := ANode;
end
else begin
ANode.Next := @FTimeNodes[Index];
ANode := ANode.Next;
ANode.Next := nil;
end;
end;
end;
//\\
if FirstNode <> nil then
begin
ANode := FirstNode;
AWorkDone := TRUE;
OnTimer(FirstNode,AWorkDone);
while(not AWorkDone) do Sleep(10);//等待 2017/06/19 14:52
//\\
while(TRUE) do
begin
//2016/09/14 20:19
if ANode.TimeOutSec > 0 then //继续定时
ANode.Slot := (ANode.TimeOutSec + FClockPosition) mod FMaxSecCount
else
ANode.Slot := -1;//删除本次定时,不会再触发
TmpNode := ANode.Next;
ANode.Next := nil;
ANode := TmpNode;// ANode.Next;
if ANode = nil then Break;
end;
end;
finally
UnLock();
end;
end;
FRunging := FALSE;
end;

procedure TsfTimerWheel.Lock;
begin
EnterCriticalSection(FCS);
end;

(* 使用方法演示
procedure TsfTimerWheel.OnTimer(AFirstNode: PWTimeNode);
var
PNode:PWTimeNode;
Msg:string;
begin
PNode := AFirstNode;
//doSomething
while(TRUE) do
begin
if PNode = nil then Break;
//doSomething
//Msg := formatDateTime(‘YYYY-MM-DD hh:mm:ss.zzz‘,PDateTime(PNode^.UserData)^) + ‘;Tag=‘ + IntToStr(PNode^.Tag) +
‘;Slot=‘ + IntToStr(PNode^.Slot);
//SendMessage(frmMain.Handle,WM_WRITE_LOG,WParam(@Msg),0);
//\\
PNode := PNode^.Next;//找到下个节点
end;
end;
*)

procedure TsfTimerWheel.UnLock;
begin
LeaveCriticalSection(FCS);
end;

end.

线程安全队列

标签:name   status   lin   dirty   rem   safe   64位系统   while   virt   

原文地址:https://www.cnblogs.com/hnxxcxg/p/8595044.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!