Distributed Systems (Dağıtık Sistemler): Teoriden Pratiğe

Bir yazılım sistemi büyüdükçe, tek bir sunucunun kapasitesi yetmemeye başlar. İşte tam bu noktada dağıtık sistemler devreye girer. Birden fazla bilgisayarın (node) ağ üzerinden iletişim kurarak tek bir sistem gibi çalışmasına dağıtık sistem diyoruz.

Bu yazıda, dağıtık sistemlerin temel kavramlarını, karşılaşılan zorlukları ve çözüm tekniklerini detaylıca inceleyeceğiz.


1. Dağıtık Sistem Nedir?

Dağıtık sistem, birden fazla bağımsız bilgisayarın (node) ağ üzerinden iletişim kurarak, kullanıcıya tek bir tutarlı sistem gibi görünen yapıdır.

Temel Özellikler

  • Concurrency (Eşzamanlılık): Node'lar aynı anda çalışır
  • No Global Clock: Her node'un kendi saati vardır
  • Independent Failures: Node'lar bağımsız olarak hata verebilir
  • Heterogeneity: Farklı donanım/yazılımlar kullanılabilir

Örnek Dağıtık Sistemler

  • Google Arama (binlerce sunucuda indeksleme)
  • Amazon (mikroservis mimarisi)
  • Bitcoin (blockchain)
  • Netflix (içerik dağıtım ağı - CDN)

2. Dağıtık Sistemlerin Temel Zorlukları

2.1 Kısmi Hatalar (Partial Failures)

En büyük zorluklardan biri: Bir node hata verirken diğerleri çalışmaya devam eder.

[Sistem]
├── [Node 1] ✓ Çalışıyor
├── [Node 2] ✗ Çöktü (ama ağ hala çalışıyor)
├── [Node 3] ✓ Çalışıyor
└── [Node 4] ? Cevap vermiyor (çöktü mü? ağ mı sorunlu?)

2.2 Zaman ve Sıralama Sorunları

Farklı makinelerde farklı saatler olabilir:

# Node 1 saati: 10:00:00.100
# Node 2 saati: 09:59:59.900 (100ms geri)

# Hangi olay önce oldu?
event1 = "Node 1'de kullanıcı kaydoldu"  # 10:00:00.150
event2 = "Node 2'de email gönderildi"    # Node 2 saati 10:00:00.050 
# (aslında 09:59:59.950) - event1'den ÖNCE mi oldu?

2.3 Ağ Sorunları (Network Problems)

Ağ her zaman güvenilir değildir:

  • Packet loss: Paket kaybı
  • Latency: Gecikme
  • Network partition: Ağ bölünmesi (birbirini görmeyen iki grup)

3. CAP Teoremi (Brewer's Theorem)

Dağıtık sistemlerin en temel teoremi: Üç özellikten sadece ikisi aynanda sağlanabilir.

Özellik Açıklama
Consistency (Tutarlılık) Her okuma en son yazmayı görür
Availability (Erişilebilirlik) Her istek cevap alır (başarısız olsa bile)
Partition Tolerance (Bölünme Toleransı) Ağ bölünmelerine rağmen sistem çalışır

CAP Kombinasyonları

        CAP Teoremi
           /|\
          / | \
         /  |  \
        /   |   \
     CP     |    AP
    (Bankacılık) (Sosyal Medya)
      Tutarlılık   Erişilebilirlik
      +           +
      Bölünme     Bölünme
      Toleransı   Toleransı

      (CA pratikte imkansız!)

Örnekler

# CP System (Consistency + Partition Tolerance) - Bankacılık
class BankAccount:
    def transfer(self, amount):
        # Her zaman tutarlı olmalı
        with distributed_lock(self.account_id):
            if self.balance >= amount:
                self.balance -= amount
                return True
        return False
    # Ağ bölünmesinde hata verir (available değil)

# AP System (Availability + Partition Tolerance) - Sosyal Medya
class SocialMediaPost:
    def like(self):
        # Tutarlılık önemli değil, her zaman cevap ver
        self.likes_count += 1  # Eventual consistency
        return "Liked!"
    # Farklı node'larda farklı like sayıları olabilir (geçici)

4. Consistency Modelleri (Tutarlılık Modelleri)

4.1 Strong Consistency (Güçlü Tutarlılık)

Tüm node'lar aynı veriyi aynı anda görür.

# Raft veya Paxos gibi consensus algoritmaları ile
def write_with_consensus(key, value):
    # Tüm node'ların çoğunluğundan onay al
    majority = len(nodes) // 2 + 1
    acks = 0

    for node in nodes:
        try:
            node.write(key, value)
            acks += 1
            if acks >= majority:
                return "OK"
        except:
            continue

    raise Exception("Consensus failed")

4.2 Eventual Consistency (Nihai Tutarlılık)

Bir süre sonra tüm node'lar aynı veriyi görür.

# Dynamo-style (Amazon DynamoDB)
class EventuallyConsistentDB:
    def write(self, key, value):
        # Tüm node'lara yaz, bekleme
        for node in self.nodes:
            self.queue.put((node, key, value))
        return "OK (async)"

    def read(self, key):
        # Tüm node'lardan oku, en güncel timestamp'i al
        results = []
        for node in self.nodes:
            results.append(node.read(key))

        # En yüksek timestamp'li değeri döndür
        return max(results, key=lambda x: x.timestamp)

4.3 Read-Your-Writes Consistency

Kullanıcı kendi yazdığını hemen okur.

class SessionConsistency:
    def __init__(self):
        self.session_writes = {}

    def write(self, session_id, key, value):
        # Yazma işlemi
        node = self.route_to_node(key)
        node.write(key, value)

        # Session'a hangi node'a yazdığımızı kaydet
        if session_id not in self.session_writes:
            self.session_writes[session_id] = {}
        self.session_writes[session_id][key] = node

    def read(self, session_id, key):
        # Önce session'ın yazdığı node'u kontrol et
        if session_id in self.session_writes and key in self.session_writes[session_id]:
            # Aynı node'dan oku
            node = self.session_writes[session_id][key]
            return node.read(key)
        else:
            # Load balance yap
            node = self.route_to_node(key)
            return node.read(key)

5. Consensus Algoritmaları (Uzlaşma Algoritmaları)

Node'ların aynı karara varmasını sağlayan algoritmalar.

5.1 Paxos (Klasik)

# Basitleştirilmiş Paxos
class PaxosNode:
    def __init__(self, node_id):
        self.node_id = node_id
        self.proposed_value = None
        self.accepted_value = None
        self.promised_round = 0

    def prepare(self, round_num):
        # Prepare phase: Daha yüksek round var mı?
        if round_num > self.promised_round:
            self.promised_round = round_num
            return ("PROMISE", self.accepted_value)
        return ("REJECT", None)

    def accept(self, round_num, value):
        # Accept phase: Promise verilmiş mi?
        if round_num >= self.promised_round:
            self.accepted_value = value
            return "ACCEPTED"
        return "REJECTED"

# Kullanım
# 1. Prepare: Çoğunluktan PROMISE al
# 2. Accept: Değeri gönder, çoğunluktan ACCEPTED al

5.2 Raft (Anlaşılması Kolay)

Raft, üç ana bileşenden oluşur:

  • Leader Election: Lider seçimi
  • Log Replication: Log çoğaltma
  • Safety: Güvenlik
import enum
import time
import random

class NodeState(enum.Enum):
    FOLLOWER = 1
    CANDIDATE = 2
    LEADER = 3

class RaftNode:
    def __init__(self, node_id, nodes):
        self.node_id = node_id
        self.nodes = nodes  # Diğer node'lar
        self.state = NodeState.FOLLOWER
        self.current_term = 0
        self.voted_for = None
        self.log = []
        self.commit_index = 0
        self.last_applied = 0

        # Election timeout: 150-300ms random
        self.election_timeout = random.uniform(0.15, 0.3)
        self.last_heartbeat = time.time()

    def tick(self):
        # Her tick'te çalışır
        if self.state == NodeState.LEADER:
            # Lider herkese heartbeat gönder
            self.send_heartbeats()
        else:
            # Follower: election timeout kontrolü
            if time.time() - self.last_heartbeat > self.election_timeout:
                self.start_election()

    def start_election(self):
        self.state = NodeState.CANDIDATE
        self.current_term += 1
        self.voted_for = self.node_id

        # Diğer node'lardan oy iste
        votes = 1  # Kendi oyu
        for node in self.nodes:
            if node.request_vote(self.current_term, self.node_id):
                votes += 1

        # Çoğunluk oyu aldıysa lider ol
        if votes > len(self.nodes) // 2:
            self.state = NodeState.LEADER
            print(f"Node {self.node_id} lider oldu (term: {self.current_term})")

    def request_vote(self, term, candidate_id):
        # Oy verme mantığı
        if term > self.current_term:
            self.current_term = term
            self.state = NodeState.FOLLOWER
            self.voted_for = None

        if (term == self.current_term and 
            (self.voted_for is None or self.voted_for == candidate_id)):
            self.voted_for = candidate_id
            self.last_heartbeat = time.time()  # Election timeout'u sıfırla
            return True
        return False

6. Distributed Transactions (Dağıtık Transaction'lar)

Birden fazla node'u içeren transaction'lar.

6.1 Two-Phase Commit (2PC)

class TwoPhaseCommit:
    def __init__(self, participants):
        self.participants = participants  # Katılımcı node'lar

    def execute_transaction(self, transaction):
        # Phase 1: Prepare
        print("Phase 1: PREPARE")
        all_ready = True

        for participant in self.participants:
            try:
                participant.prepare(transaction)
                print(f"  {participant.id}: READY")
            except Exception as e:
                print(f"  {participant.id}: ABORT ({e})")
                all_ready = False

        # Phase 2: Commit veya Abort
        if all_ready:
            print("Phase 2: COMMIT")
            for participant in self.participants:
                participant.commit(transaction)
        else:
            print("Phase 2: ABORT")
            for participant in self.participants:
                participant.abort(transaction)

# Sorun: Coordinator çökerse ne olur?
# Çözüm: Three-Phase Commit (3PC)

6.2 Saga Pattern (Uzun Yaşayan Transaction'lar)

class SagaOrchestrator:
    def __init__(self):
        self.steps = []
        self.compensations = []

    def add_step(self, action, compensation):
        self.steps.append(action)
        self.compensations.append(compensation)

    def execute(self):
        completed = []

        try:
            for i, step in enumerate(self.steps):
                print(f"Executing step {i}")
                step()
                completed.append(i)

            print("Saga completed successfully!")

        except Exception as e:
            print(f"Error at step {len(completed)}: {e}")
            print("Starting compensation...")

            # Ters sırada compensation çalıştır
            for i in reversed(completed):
                self.compensations[i]()

            print("Compensation completed")

# Kullanım: Sipariş - Ödeme - Stok
saga = SagaOrchestrator()
saga.add_step(
    lambda: create_order(),
    lambda: cancel_order()
)
saga.add_step(
    lambda: process_payment(),
    lambda: refund_payment()
)
saga.add_step(
    lambda: reserve_stock(),
    lambda: release_stock()
)
saga.execute()

7. Distributed Locking (Dağıtık Kilit)

7.1 Redis ile Distributed Lock (Redlock)

import redis
import time
import uuid

class RedisDistributedLock:
    def __init__(self, redis_client, lock_name, ttl=10):
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"
        self.lock_value = str(uuid.uuid4())
        self.ttl = ttl

    def acquire(self, blocking=True, timeout=10):
        start_time = time.time()

        while True:
            # SET NX: Sadece yoksa set et
            # PX: Expire süresi
            acquired = self.redis.set(
                self.lock_name, 
                self.lock_value,
                nx=True, 
                px=self.ttl * 1000
            )

            if acquired:
                return True

            if not blocking:
                return False

            if timeout and (time.time() - start_time) > timeout:
                return False

            time.sleep(0.1)

    def release(self):
        # Lua script ile atomik release
        # Sadece lock'u alan thread bırakabilir
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        self.redis.eval(
            lua_script, 
            1, 
            self.lock_name, 
            self.lock_value
        )

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, *args):
        self.release()

# Kullanım
redis_client = redis.Redis(host='localhost', port=6379)

with RedisDistributedLock(redis_client, "resource-1"):
    # Kritik bölge
    # Sadece bir node burada olabilir
    update_database()

7.2 ZooKeeper ile Distributed Lock

from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock

class ZooKeeperDistributedLock:
    def __init__(self, zk_hosts, lock_path):
        self.zk = KazooClient(hosts=zk_hosts)
        self.zk.start()
        self.lock = Lock(self.zk, lock_path)

    def execute_with_lock(self, func, *args, **kwargs):
        with self.lock:
            # Kritik bölge
            return func(*args, **kwargs)

    def __del__(self):
        self.zk.stop()

# Kullanım
zk_lock = ZooKeeperDistributedLock('localhost:2181', '/myapp/locks/resource1')

def critical_operation(user_id):
    print(f"User {user_id} in critical section")
    time.sleep(2)
    return "OK"

# 10 thread farklı user'lar için çalışır
# Ama critical_operation aynı anda sadece 1 kere çalışır

8. Distributed Caching (Dağıtık Önbellek)

8.1 Consistent Hashing

import hashlib
import bisect

class ConsistentHashRing:
    def __init__(self, nodes=None, replicas=3):
        self.replicas = replicas  # Her node için sanal node sayısı
        self.ring = {}
        self.sorted_keys = []

        if nodes:
            for node in nodes:
                self.add_node(node)

    def _hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_node(self, node):
        for i in range(self.replicas):
            virtual_key = f"{node}:{i}"
            hash_key = self._hash(virtual_key)
            self.ring[hash_key] = node
            bisect.insort(self.sorted_keys, hash_key)

    def remove_node(self, node):
        for i in range(self.replicas):
            virtual_key = f"{node}:{i}"
            hash_key = self._hash(virtual_key)
            del self.ring[hash_key]
            self.sorted_keys.remove(hash_key)

    def get_node(self, key):
        if not self.ring:
            return None

        hash_key = self._hash(key)

        # Binary search: hash_key'den büyük ilk key'i bul
        idx = bisect.bisect(self.sorted_keys, hash_key)

        if idx == len(self.sorted_keys):
            idx = 0  # Wrap around

        return self.ring[self.sorted_keys[idx]]

# Kullanım
cache_nodes = ['cache1', 'cache2', 'cache3']
ring = ConsistentHashRing(cache_nodes, replicas=100)

# Bir key hangi node'a gitmeli?
node = ring.get_node('user:12345')
print(f"user:12345 -> {node}")

# Node ekle/çıkar
ring.remove_node('cache2')
# Sadece 1/3 keys yeniden dağılır (ideal durum)

8.2 Distributed Cache with Redis Cluster

import redis
from rediscluster import RedisCluster

class DistributedCache:
    def __init__(self, startup_nodes):
        # Redis Cluster: Automatic sharding
        self.rc = RedisCluster(
            startup_nodes=startup_nodes,
            decode_responses=True,
            skip_full_coverage_check=True
        )

    def set_user_session(self, user_id, session_data):
        # Redis Cluster otomatik olarak doğru node'a yönlendirir
        key = f"session:{user_id}"
        self.rc.hmset(key, session_data)
        self.rc.expire(key, 3600)  # 1 saat

    def get_user_session(self, user_id):
        key = f"session:{user_id}"
        return self.rc.hgetall(key)

    def invalidate_user_session(self, user_id):
        key = f"session:{user_id}"
        self.rc.delete(key)

# Kullanım
startup_nodes = [
    {'host': 'redis-node1', 'port': 7000},
    {'host': 'redis-node2', 'port': 7000},
    {'host': 'redis-node3', 'port': 7000},
]

cache = DistributedCache(startup_nodes)
cache.set_user_session(12345, {'name': 'Ahmet', 'cart': '3 items'})

9. Distributed Messaging (Dağıtık Mesajlaşma)

9.1 Apache Kafka ile Event Streaming

from kafka import KafkaProducer, KafkaConsumer
import json

class EventBus:
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    def publish(self, topic, event):
        future = self.producer.send(topic, event)
        # Senkron bekleme (opsiyonel)
        result = future.get(timeout=10)
        print(f"Published to {topic}: partition={result.partition}, offset={result.offset}")
        return result

    def create_consumer(self, topic, group_id):
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=['localhost:9092'],
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest',
            enable_auto_commit=True
        )
        return consumer

# Order Service (Producer)
event_bus = EventBus(['localhost:9092'])

def order_created(order):
    event = {
        'type': 'ORDER_CREATED',
        'order_id': order.id,
        'user_id': order.user_id,
        'total': order.total,
        'timestamp': time.time()
    }
    event_bus.publish('orders', event)

# Inventory Service (Consumer)
consumer = event_bus.create_consumer('orders', 'inventory-service')

for message in consumer:
    event = message.value
    if event['type'] == 'ORDER_CREATED':
        print(f"Stok güncelleniyor: Order {event['order_id']}")
        update_inventory(event['order_id'])

9.2 RabbitMQ ile Message Queue

import pika
import json

class MessageQueue:
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()

    def declare_queue(self, queue_name, durable=True):
        self.channel.queue_declare(queue=queue_name, durable=durable)

    def publish(self, queue_name, message):
        self.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent message
            )
        )

    def consume(self, queue_name, callback):
        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback,
            auto_ack=False  # Manual ack
        )
        print('Waiting for messages...')
        self.channel.start_consuming()

    def close(self):
        self.connection.close()

# Callback fonksiyonu
def process_email(ch, method, properties, body):
    message = json.loads(body)
    print(f"Sending email to {message['email']}")
    # Email gönderme işlemi
    time.sleep(1)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Kullanım
mq = MessageQueue()
mq.declare_queue('email_queue')

# Publisher
mq.publish('email_queue', {
    'email': 'user@example.com',
    'subject': 'Hoşgeldiniz',
    'body': '...'
})

# Consumer (farklı bir process'te)
mq.consume('email_queue', process_email)

10. Distributed Tracing (Dağıtık İzleme)

from opentelemetry import trace
from opentelemetry.exporter.jaeger import JaegerSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import requests

class DistributedTracer:
    def __init__(self, service_name):
        trace.set_tracer_provider(TracerProvider())
        self.tracer = trace.get_tracer(service_name)

        # Jaeger exporter
        jaeger_exporter = JaegerSpanExporter(
            agent_host_name='localhost',
            agent_port=6831
        )

        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)

    def trace_request(self, name):
        return self.tracer.start_as_current_span(name)

    def http_call_with_tracing(self, url, headers=None):
        with self.tracer.start_as_current_span(f"http:{url}") as span:
            # Trace context'i header'a ekle
            current_span = trace.get_current_span()
            ctx = trace.get_current_span().get_span_context()

            if headers is None:
                headers = {}

            # W3C Trace Context headers
            headers['traceparent'] = f"00-{ctx.trace_id}-{ctx.span_id}-01"

            # HTTP çağrısı
            response = requests.get(url, headers=headers)
            span.set_attribute("http.status_code", response.status_code)

            return response

# Service A
tracer = DistributedTracer("order-service")

with tracer.trace_request("create_order"):
    # Service B'ye çağrı
    response = tracer.http_call_with_tracing(
        "http://payment-service/api/pay"
    )

11. Gerçek Hayat Senaryosu: E-Ticaret Mikroservis Mimarisi

# complete_ecommerce_system.py

import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from typing import Dict, List
import random

# ============== MİKROSERVİSLER ==============

@dataclass
class Order:
    id: str
    user_id: str
    items: List[Dict]
    total: float
    status: str

class OrderService:
    def __init__(self):
        self.orders = {}
        self.event_bus = EventBus()
        self.cache = DistributedCache()

    async def create_order(self, user_id, items):
        # 1. Generate order ID
        order_id = str(uuid.uuid4())

        # 2. Calculate total
        total = sum(item['price'] * item['quantity'] for item in items)

        # 3. Create order
        order = Order(
            id=order_id,
            user_id=user_id,
            items=items,
            total=total,
            status='PENDING'
        )

        # 4. Cache'e yaz
        self.cache.set(f"order:{order_id}", order.__dict__)

        # 5. Event yayınla (Saga başlangıcı)
        await self.event_bus.publish('order.created', {
            'order_id': order_id,
            'user_id': user_id,
            'items': items,
            'total': total
        })

        return order

class PaymentService:
    def __init__(self):
        self.event_bus = EventBus()

    async def process_payment(self, order_id, user_id, amount):
        # Simulate payment processing
        await asyncio.sleep(random.uniform(0.1, 0.3))

        # %90 success rate
        success = random.random() < 0.9

        if success:
            await self.event_bus.publish('payment.succeeded', {
                'order_id': order_id,
                'user_id': user_id,
                'amount': amount
            })
        else:
            await self.event_bus.publish('payment.failed', {
                'order_id': order_id,
                'user_id': user_id,
                'reason': 'Insufficient funds'
            })

class InventoryService:
    def __init__(self):
        self.stock = {
            'product1': 10,
            'product2': 5,
            'product3': 0
        }
        self.locks = {}

    async def reserve_stock(self, order_id, items):
        # Distributed lock ile stock'u koru
        lock = RedisDistributedLock(redis_client, f"stock:{order_id}")

        with lock:
            for item in items:
                product_id = item['product_id']
                quantity = item['quantity']

                if self.stock.get(product_id, 0) < quantity:
                    # Stok yetersiz, compensation başlat
                    await self.event_bus.publish('inventory.failed', {
                        'order_id': order_id,
                        'product_id': product_id,
                        'reason': 'Out of stock'
                    })
                    return False

                self.stock[product_id] -= quantity

            # Stok başarıyla ayrıldı
            await self.event_bus.publish('inventory.reserved', {
                'order_id': order_id,
                'items': items
            })
            return True

# ============== EVENT BUS ==============

class EventBus:
    def __init__(self):
        self.subscribers = {}

    def subscribe(self, event_type, handler):
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)

    async def publish(self, event_type, data):
        print(f"\n[EVENT] {event_type}: {json.dumps(data)}")

        if event_type in self.subscribers:
            for handler in self.subscribers[event_type]:
                # Async olarak çağır
                asyncio.create_task(handler(data))

# ============== SAGA ORCHESTRATOR ==============

class OrderSagaOrchestrator:
    def __init__(self, order_svc, payment_svc, inventory_svc, event_bus):
        self.order_svc = order_svc
        self.payment_svc = payment_svc
        self.inventory_svc = inventory_svc
        self.event_bus = event_bus
        self.saga_state = {}

        # Event handlers
        event_bus.subscribe('order.created', self.on_order_created)
        event_bus.subscribe('inventory.reserved', self.on_inventory_reserved)
        event_bus.subscribe('payment.succeeded', self.on_payment_succeeded)
        event_bus.subscribe('payment.failed', self.on_payment_failed)
        event_bus.subscribe('inventory.failed', self.on_inventory_failed)

    async def on_order_created(self, data):
        order_id = data['order_id']
        self.saga_state[order_id] = {
            'status': 'INVENTORY_RESERVING',
            'data': data
        }

        # Step 1: Reserve inventory
        await self.inventory_svc.reserve_stock(
            order_id, 
            data['items']
        )

    async def on_inventory_reserved(self, data):
        order_id = data['order_id']
        self.saga_state[order_id]['status'] = 'PAYMENT_PROCESSING'

        # Step 2: Process payment
        await self.payment_svc.process_payment(
            order_id,
            self.saga_state[order_id]['data']['user_id'],
            self.saga_state[order_id]['data']['total']
        )

    async def on_payment_succeeded(self, data):
        order_id = data['order_id']
        self.saga_state[order_id]['status'] = 'COMPLETED'

        print(f"\n✅ Sipariş {order_id} başarıyla tamamlandı!")

        # Final step: Update order status
        # (in real system: update database)

    async def on_payment_failed(self, data):
        order_id = data['order_id']
        print(f"\n❌ Sipariş {order_id} başarısız: {data['reason']}")

        # COMPENSATION: Release inventory
        await self.event_bus.publish('inventory.release', {
            'order_id': order_id,
            'reason': 'payment_failed'
        })

    async def on_inventory_failed(self, data):
        order_id = data['order_id']
        print(f"\n❌ Sipariş {order_id} başarısız: Stok yetersiz")

        # No compensation needed (nothing happened yet)
        self.saga_state[order_id]['status'] = 'FAILED'

# ============== LOAD BALANCER ==============

class LoadBalancer:
    def __init__(self, nodes):
        self.nodes = nodes
        self.current = 0

    def round_robin(self):
        # Round-robin load balancing
        node = self.nodes[self.current]
        self.current = (self.current + 1) % len(self.nodes)
        return node

    def least_connections(self, connections):
        # Least connections
        return min(self.nodes, key=lambda n: connections.get(n, 0))

    def weighted_random(self, weights):
        # Weighted random
        total = sum(weights.values())
        r = random.uniform(0, total)
        upto = 0
        for node, weight in weights.items():
            if upto + weight >= r:
                return node
            upto += weight

# ============== MAIN SIMULATION ==============

async def simulate_ecommerce():
    print("???? Dağıtık E-Ticaret Sistemi Başlatılıyor...\n")

    # Initialize services
    event_bus = EventBus()
    order_svc = OrderService()
    payment_svc = PaymentService()
    inventory_svc = InventoryService()

    # Initialize saga orchestrator
    saga = OrderSagaOrchestrator(
        order_svc, payment_svc, inventory_svc, event_bus
    )

    # Simulate multiple users
    users = ['user1', 'user2', 'user3', 'user4', 'user5']
    products = [
        {'product_id': 'product1', 'price': 100, 'quantity': 1},
        {'product_id': 'product2', 'price': 50, 'quantity': 2},
        {'product_id': 'product3', 'price': 200, 'quantity': 1},  # Out of stock
    ]

    # Concurrent orders
    tasks = []
    for i, user in enumerate(users):
        # Random product selection
        items = random.sample(products, random.randint(1, 2))
        task = order_svc.create_order(user, items)
        tasks.append(task)
        await asyncio.sleep(0.1)  # Stagger orders

    # Wait for all orders to complete
    orders = await asyncio.gather(*tasks)

    print(f"\n???? {len(orders)} sipariş oluşturuldu")
    print("???? Saga'lar çalışıyor...")

    # Let sagas complete
    await asyncio.sleep(3)

    # Final status
    completed = sum(1 for o in orders if saga.saga_state.get(o.id, {}).get('status') == 'COMPLETED')
    failed = len(orders) - completed

    print(f"\n???? Sonuçlar:")
    print(f"  ✅ Başarılı: {completed}")
    print(f"  ❌ Başarısız: {failed}")

# Run simulation
if __name__ == "__main__":
    asyncio.run(simulate_ecommerce())

12. Dağıtık Sistemlerde En İyi Pratikler

12.1 Circuit Breaker Pattern

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN

    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            if self.state == 'HALF_OPEN':
                self.state = 'CLOSED'
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = 'OPEN'

            raise e

# Kullanım
cb = CircuitBreaker(failure_threshold=3)

def call_payment_service():
    # Risky external call
    response = requests.get('http://payment-service/api/pay')
    return response.json()

try:
    result = cb.call(call_payment_service)
except:
    result = fallback_payment_method()

12.2 Retry with Exponential Backoff

import time
import random

def retry_with_backoff(func, max_retries=5, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e

            # Exponential backoff with jitter
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s...")
            time.sleep(delay)

# Kullanım
def call_database():
    # Risky operation
    return db.query("SELECT * FROM users")

result = retry_with_backoff(call_database)

12.3 Idempotency (Tekrarlanabilirlik)

class IdempotentAPI:
    def __init__(self):
        self.processed_requests = {}  # Redis'te olmalı

    def handle_request(self, request_id, func):
        # Aynı request_id daha önce işlenmiş mi?
        if request_id in self.processed_requests:
            print(f"Request {request_id} already processed")
            return self.processed_requests[request_id]

        # İlk kez işleniyor
        result = func()
        self.processed_requests[request_id] = result
        return result

# Kullanım
api = IdempotentAPI()

# Aynı request iki kere gelirse
result1 = api.handle_request('req123', process_payment)
result2 = api.handle_request('req123', process_payment)  # Aynı sonucu döndürür

13. Dağıtık Sistemlerde Test Stratejileri

13.1 Chaos Engineering (Netflix Chaos Monkey)

import random
import time

class ChaosMonkey:
    def __init__(self, services, chaos_level=0.1):
        self.services = services
        self.chaos_level = chaos_level

    def inject_failure(self):
        if random.random() < self.chaos_level:
            target = random.choice(self.services)
            print(f"???? Chaos Monkey attacking {target}!")

            failure_type = random.choice([
                'kill_service',
                'network_latency',
                'packet_loss',
                'disk_full'
            ])

            if failure_type == 'kill_service':
                self.kill_service(target)
            elif failure_type == 'network_latency':
                self.add_latency(target, 2000)  # 2 second delay
            elif failure_type == 'packet_loss':
                self.drop_packets(target, 0.3)  # 30% packet loss

    def kill_service(self, service):
        print(f"???? Killing {service}...")
        # In production: Actually kill the service!
        # service.stop()

    def add_latency(self, service, latency_ms):
        print(f"???? Adding {latency_ms}ms latency to {service}")
        # Add network latency

    def drop_packets(self, service, drop_rate):
        print(f"???? Dropping {drop_rate*100}% packets to {service}")

# Test sırasında
chaos = ChaosMonkey(['order-service', 'payment-service', 'inventory-service'])

while True:
    chaos.inject_failure()
    time.sleep(10)  # Every 10 seconds

13.2 Jepsen-style Tests

# Consistency tests
def test_consistency():
    nodes = start_cluster(5)

    # Write to leader
    leader = get_leader(nodes)
    leader.write('x', 1)

    # Kill leader
    leader.stop()

    # Read from other nodes
    values = []
    for node in nodes[1:]:
        values.append(node.read('x'))

    # All nodes should eventually have 'x'=1
    assert all(v == 1 for v in values), "Consistency violation!"

14. Dağıtık Sistemlerde Monitoring

from prometheus_client import Counter, Histogram, Gauge
import time

# Metrics
request_count = Counter('http_requests_total', 'Total HTTP requests')
request_duration = Histogram('http_request_duration_seconds', 'Request duration')
active_requests = Gauge('active_requests', 'Active requests')
service_health = Gauge('service_health', 'Service health (1=up, 0=down)')

class DistributedMonitoring:
    def __init__(self, service_name):
        self.service_name = service_name
        self.heartbeat_interval = 5  # seconds

    async def send_heartbeat(self):
        while True:
            service_health.labels(service=self.service_name).set(1)
            await asyncio.sleep(self.heartbeat_interval)

    @contextmanager
    def monitor_request(self, endpoint):
        start = time.time()
        active_requests.inc()
        request_count.labels(endpoint=endpoint).inc()

        try:
            yield
        finally:
            duration = time.time() - start
            request_duration.labels(endpoint=endpoint).observe(duration)
            active_requests.dec()

    def check_health(self):
        # Health check endpoint
        return {
            'service': self.service_name,
            'status': 'healthy',
            'timestamp': time.time(),
            'active_requests': active_requests._value.get(),
            'total_requests': request_count._value.get()
        }

Özet: Dağıtık Sistemler Kontrol Listesi

Tasarım Aşamasında

  • [ ] CAP teoremini anlayın: Consistency mı, Availability mi?
  • [ ] Consensus algoritması seçin: Raft (anlaşılır) mı, Paxos (karmaşık) mı?
  • [ ] Data partitioning stratejisi: Consistent hashing mi, range partitioning mi?
  • [ ] Replication faktörü: Kaç kopya? (genelde 3)

Geliştirme Aşamasında

  • [ ] Idempotency: Aynı istek iki kere gelirse ne olur?
  • [ ] Retry logic: Exponential backoff + jitter
  • [ ] Circuit breaker: Hata durumunda sistemi koru
  • [ ] Distributed tracing: OpenTelemetry ile izle
  • [ ] Health checks: Her servis health endpoint'i sunmalı

Operasyon Aşamasında

  • [ ] Monitoring: Prometheus + Grafana
  • [ ] Logging: Centralized logging (ELK, Loki)
  • [ ] Alerting: Kritik durumlarda uyarı
  • [ ] Chaos engineering: Düzenli olarak hata enjekte et
  • [ ] Backup & Recovery: Düzenli yedekleme ve felaket kurtarma planı

Yaygın Hatalar

  • ❌ Network'ün güvenilir olduğunu varsaymak
  • ❌ Clock synchronization'a güvenmek (NTP bile yetmez)
  • ❌ Split-brain senaryolarını düşünmemek
  • ❌ Distributed transaction'ları 2PC ile çözmeye çalışmak
  • ❌ Monitoring'i sonraya bırakmak

Unutmayın: Dağıtık sistemlerde her şey hata verebilir. Network, disk, CPU, memory, time... Tasarımınızı en kötü senaryoya göre yapın ve her şeyin hata verebileceğini varsayın.